NIFI-5769: Refactored FlowController to use Composition over Inheritance - Ensure that when root group is set, that we register its ID in FlowManager
This closes #3132. Signed-off-by: Bryan Bende <bbe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/931bb0bc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/931bb0bc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/931bb0bc Branch: refs/heads/master Commit: 931bb0bc3b1c0205b260261ce9730af87204e115 Parents: 59e102a Author: Mark Payne <marka...@hotmail.com> Authored: Fri Oct 26 10:20:08 2018 -0400 Committer: Bryan Bende <bbe...@apache.org> Committed: Tue Nov 6 11:23:33 2018 -0500 ---------------------------------------------------------------------- .../nifi/controller/AbstractComponentNode.java | 45 +- .../apache/nifi/controller/ComponentNode.java | 14 +- .../apache/nifi/controller/ProcessorNode.java | 22 +- .../nifi/controller/flow/FlowManager.java | 290 ++ .../service/ControllerServiceProvider.java | 32 +- .../nifi/logging/LogRepositoryFactory.java | 4 +- .../nifi/reporting/UserAwareEventAccess.java | 69 + .../validation/TriggerValidationTask.java | 14 +- .../nifi/controller/ExtensionBuilder.java | 470 +++ .../apache/nifi/controller/FlowController.java | 3038 ++---------------- .../org/apache/nifi/controller/FlowSnippet.java | 47 + .../nifi/controller/StandardFlowService.java | 74 +- .../nifi/controller/StandardFlowSnippet.java | 619 ++++ .../controller/StandardFlowSynchronizer.java | 130 +- .../nifi/controller/StandardProcessorNode.java | 102 +- .../controller/StandardReloadComponent.java | 209 ++ .../controller/flow/StandardFlowManager.java | 656 ++++ .../controller/kerberos/KerberosConfig.java | 45 + .../server/StandardLoadBalanceProtocol.java | 2 +- .../reporting/StandardReportingContext.java | 8 +- .../StandardReportingInitializationContext.java | 28 +- .../reporting/StandardReportingTaskNode.java | 13 +- .../repository/StandardQueueProvider.java | 45 + .../scheduling/StandardProcessScheduler.java | 14 +- .../serialization/StandardFlowSerializer.java | 4 +- .../service/ControllerServiceLoader.java | 19 +- .../service/GhostControllerService.java | 82 + ...dControllerServiceInitializationContext.java | 20 +- .../StandardControllerServiceProvider.java | 257 +- .../controller/state/StandardStateManager.java | 6 +- .../nifi/controller/tasks/ConnectableTask.java | 8 +- .../nifi/controller/tasks/ExpireFlowFiles.java | 8 +- .../nifi/groups/StandardProcessGroup.java | 141 +- .../repository/StandardLogRepository.java | 6 +- .../StandardProcessorInitializationContext.java | 20 +- .../provenance/ComponentIdentifierLookup.java | 71 + .../StandardProvenanceAuthorizableFactory.java | 119 + .../nifi/remote/StandardRemoteProcessGroup.java | 81 +- .../nifi/reporting/StandardEventAccess.java | 691 ++++ .../controller/StandardFlowServiceSpec.groovy | 10 +- .../nifi/controller/TestFlowController.java | 101 +- .../controller/TestStandardProcessorNode.java | 12 +- .../queue/clustered/LoadBalancedQueueIT.java | 6 +- .../server/TestStandardLoadBalanceProtocol.java | 5 +- .../reporting/TestStandardReportingContext.java | 2 +- .../scheduling/ProcessorLifecycleIT.java | 277 +- .../scheduling/StandardProcessSchedulerIT.java | 100 - .../TestStandardProcessScheduler.java | 143 +- .../StandardFlowSerializerTest.java | 6 +- .../StandardControllerServiceProviderIT.java | 58 +- .../StandardControllerServiceProviderTest.java | 55 +- .../TestStandardControllerServiceProvider.java | 196 +- .../service/mock/MockProcessGroup.java | 8 +- .../StandardExtensionDiscoveringManager.java | 2 +- .../apache/nifi/web/api/VersionsResource.java | 4 +- .../org/apache/nifi/web/api/dto/DtoFactory.java | 3 +- .../nifi/web/controller/ControllerFacade.java | 114 +- .../web/controller/ControllerSearchService.java | 2 +- .../apache/nifi/web/dao/impl/ComponentDAO.java | 2 +- .../web/dao/impl/StandardConnectionDAO.java | 8 +- .../dao/impl/StandardControllerServiceDAO.java | 26 +- .../nifi/web/dao/impl/StandardFunnelDAO.java | 8 +- .../nifi/web/dao/impl/StandardInputPortDAO.java | 10 +- .../nifi/web/dao/impl/StandardLabelDAO.java | 8 +- .../web/dao/impl/StandardOutputPortDAO.java | 10 +- .../web/dao/impl/StandardProcessGroupDAO.java | 25 +- .../nifi/web/dao/impl/StandardProcessorDAO.java | 15 +- .../dao/impl/StandardRemoteProcessGroupDAO.java | 12 +- .../nifi/web/dao/impl/StandardSnippetDAO.java | 24 +- .../nifi/web/dao/impl/StandardTemplateDAO.java | 16 +- .../ControllerServiceProviderFactoryBean.java | 2 +- .../org/apache/nifi/web/util/SnippetUtils.java | 4 +- .../src/main/resources/nifi-web-api-context.xml | 15 +- .../web/dao/impl/StandardTemplateDAOSpec.groovy | 12 +- .../impl/TestStandardRemoteProcessGroupDAO.java | 7 +- 75 files changed, 4837 insertions(+), 3994 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java index ab9ece0..f3ae41f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java @@ -16,26 +16,6 @@ */ package org.apache.nifi.controller; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.bundle.Bundle; @@ -58,6 +38,26 @@ import org.apache.nifi.util.file.classloader.ClassLoaderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + public abstract class AbstractComponentNode implements ComponentNode { private static final Logger logger = LoggerFactory.getLogger(AbstractComponentNode.class); @@ -85,17 +85,16 @@ public abstract class AbstractComponentNode implements ComponentNode { public AbstractComponentNode(final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, - final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger, - final boolean isExtensionMissing) { + final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger, final boolean isExtensionMissing) { this.id = id; this.validationContextFactory = validationContextFactory; this.serviceProvider = serviceProvider; this.name = new AtomicReference<>(componentType); this.componentType = componentType; this.componentCanonicalClass = componentCanonicalClass; + this.reloadComponent = reloadComponent; this.variableRegistry = variableRegistry; this.validationTrigger = validationTrigger; - this.reloadComponent = reloadComponent; this.extensionManager = extensionManager; this.isExtensionMissing = new AtomicBoolean(isExtensionMissing); } http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java index 707bb75..d0ed572 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java @@ -16,13 +16,6 @@ */ package org.apache.nifi.controller; -import java.net.URL; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.AuthorizationResult; import org.apache.nifi.authorization.AuthorizationResult.Result; @@ -39,6 +32,13 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.registry.ComponentVariableRegistry; +import java.net.URL; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + public interface ComponentNode extends ComponentAuthorizable { @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index f3adab0..6e8206e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.controller; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.components.validation.ValidationTrigger; import org.apache.nifi.connectable.Connectable; @@ -40,6 +32,14 @@ import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + public abstract class ProcessorNode extends AbstractComponentNode implements Connectable { protected final AtomicReference<ScheduledState> scheduledState; @@ -176,6 +176,8 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con * initiate processor <i>start</i> task * @param administrativeYieldMillis * the amount of milliseconds to wait for administrative yield + * @param timeoutMillis the number of milliseconds to wait after triggering the Processor's @OnScheduled methods before timing out and considering + * the startup a failure. This will result in the thread being interrupted and trying again. * @param processContext * the instance of {@link ProcessContext} * @param schedulingAgentCallback @@ -186,8 +188,8 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con * value is <code>true</code> or if the Processor is in any state other than 'STOPPING' or 'RUNNING', then this method * will throw an {@link IllegalStateException}. */ - public abstract void start(ScheduledExecutorService scheduler, - long administrativeYieldMillis, ProcessContext processContext, SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping); + public abstract void start(ScheduledExecutorService scheduler, long administrativeYieldMillis, long timeoutMillis, ProcessContext processContext, + SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping); /** * Will stop the {@link Processor} represented by this {@link ProcessorNode}. http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java new file mode 100644 index 0000000..c741f33 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.flow; + +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.exception.ProcessorInstantiationException; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; + +import java.net.URL; +import java.util.Collection; +import java.util.Set; + +public interface FlowManager { + String ROOT_GROUP_ID_ALIAS = "root"; + String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow"; + + /** + * Creates a Port to use as an Input Port for receiving data via Site-to-Site communications + * + * @param id port id + * @param name port name + * @return new port + * @throws NullPointerException if the ID or name is not unique + * @throws IllegalStateException if a Port already exists with the same id. + */ + Port createRemoteInputPort(String id, String name); + + /** + * Creates a Port to use as an Output Port for transferring data via Site-to-Site communications + * + * @param id port id + * @param name port name + * @return new port + * @throws NullPointerException if the ID or name is not unique + * @throws IllegalStateException if a Port already exists with the same id. + */ + Port createRemoteOutputPort(String id, String name); + + /** + * Creates a new Remote Process Group with the given ID that points to the given URI + * + * @param id Remote Process Group ID + * @param uris group uris, multiple url can be specified in comma-separated format + * @return new remote process group + * @throws NullPointerException if either argument is null + * @throws IllegalArgumentException if any of the <code>uri</code>s is not a valid URI. + */ + RemoteProcessGroup createRemoteProcessGroup(String id, String uris); + + /** + * @return the ProcessGroup that is currently assigned as the Root Group + */ + ProcessGroup getRootGroup(); + + String getRootGroupId(); + + /** + * Creates an instance of the given snippet and adds the components to the given group + * + * @param group group + * @param dto dto + * + * @throws NullPointerException if either argument is null + * @throws IllegalStateException if the snippet is not valid because a + * component in the snippet has an ID that is not unique to this flow, or + * because it shares an Input Port or Output Port at the root level whose + * name already exists in the given ProcessGroup, or because the Template + * contains a Processor or a Prioritizer whose class is not valid within + * this instance of NiFi. + * @throws ProcessorInstantiationException if unable to instantiate a + * processor + */ + void instantiateSnippet(ProcessGroup group, FlowSnippetDTO dto) throws ProcessorInstantiationException; + + /** + * Indicates whether or not the two ID's point to the same ProcessGroup. If + * either id is null, will return <code>false</code>. + * + * @param id1 group id + * @param id2 other group id + * @return true if same + */ + boolean areGroupsSame(String id1, String id2); + + /** + * Creates a new instance of the FlowFilePrioritizer with the given type + * @param type the type of the prioritizer (fully qualified class name) + * @return the newly created FlowFile Prioritizer + */ + FlowFilePrioritizer createPrioritizer(String type) throws InstantiationException, IllegalAccessException, ClassNotFoundException; + + /** + * Returns the ProcessGroup with the given ID, or null if no group exists with the given ID. + * @param id id of the group + * @return the ProcessGroup with the given ID or null if none can be found + */ + ProcessGroup getGroup(String id); + + void onProcessGroupAdded(ProcessGroup group); + + void onProcessGroupRemoved(ProcessGroup group); + + + /** + * Finds the Connectable with the given ID, or null if no such Connectable exists + * @param id the ID of the Connectable + * @return the Connectable with the given ID, or null if no such Connectable exists + */ + Connectable findConnectable(String id); + + /** + * Returns the ProcessorNode with the given ID + * @param id the ID of the Processor + * @return the ProcessorNode with the given ID or null if no such Processor exists + */ + ProcessorNode getProcessorNode(String id); + + void onProcessorAdded(ProcessorNode processor); + + void onProcessorRemoved(ProcessorNode processor); + + + /** + * <p> + * Creates a new ProcessorNode with the given type and identifier and + * initializes it invoking the methods annotated with {@link org.apache.nifi.annotation.lifecycle.OnAdded}. + * </p> + * + * @param type processor type + * @param id processor id + * @param coordinate the coordinate of the bundle for this processor + * @return new processor + * @throws NullPointerException if either arg is null + */ + ProcessorNode createProcessor(String type, String id, BundleCoordinate coordinate); + + /** + * <p> + * Creates a new ProcessorNode with the given type and identifier and + * optionally initializes it. + * </p> + * + * @param type the fully qualified Processor class name + * @param id the unique ID of the Processor + * @param coordinate the bundle coordinate for this processor + * @param firstTimeAdded whether or not this is the first time this + * Processor is added to the graph. If {@code true}, will invoke methods + * annotated with the {@link org.apache.nifi.annotation.lifecycle.OnAdded} annotation. + * @return new processor node + * @throws NullPointerException if either arg is null + */ + ProcessorNode createProcessor(String type, String id, BundleCoordinate coordinate, boolean firstTimeAdded); + + /** + * <p> + * Creates a new ProcessorNode with the given type and identifier and + * optionally initializes it. + * </p> + * + * @param type the fully qualified Processor class name + * @param id the unique ID of the Processor + * @param coordinate the bundle coordinate for this processor + * @param firstTimeAdded whether or not this is the first time this + * Processor is added to the graph. If {@code true}, will invoke methods + * annotated with the {@link org.apache.nifi.annotation.lifecycle.OnAdded} annotation. + * @return new processor node + * @throws NullPointerException if either arg is null + */ + ProcessorNode createProcessor(String type, String id, BundleCoordinate coordinate, Set<URL> additionalUrls, boolean firstTimeAdded, boolean registerLogObserver); + + + + Label createLabel(String id, String text); + + Funnel createFunnel(String id); + + Port createLocalInputPort(String id, String name); + + Port createLocalOutputPort(String id, String name); + + ProcessGroup createProcessGroup(String id); + + + + void onConnectionAdded(Connection connection); + + void onConnectionRemoved(Connection connection); + + Connection getConnection(String id); + + Set<Connection> findAllConnections(); + + /** + * Creates a connection between two Connectable objects. + * + * @param id required ID of the connection + * @param name the name of the connection, or <code>null</code> to leave the connection unnamed + * @param source required source + * @param destination required destination + * @param relationshipNames required collection of relationship names + * @return the created Connection + * + * @throws NullPointerException if the ID, source, destination, or set of relationships is null. + * @throws IllegalArgumentException if <code>relationships</code> is an empty collection + */ + Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames); + + + + void onInputPortAdded(Port inputPort); + + void onInputPortRemoved(Port inputPort); + + Port getInputPort(String id); + + + + void onOutputPortAdded(Port outputPort); + + void onOutputPortRemoved(Port outputPort); + + Port getOutputPort(String id); + + + + void onFunnelAdded(Funnel funnel); + + void onFunnelRemoved(Funnel funnel); + + Funnel getFunnel(String id); + + + + ReportingTaskNode createReportingTask(String type, BundleCoordinate bundleCoordinate); + + ReportingTaskNode createReportingTask(String type, BundleCoordinate bundleCoordinate, boolean firstTimeAdded); + + ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded); + + ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, Set<URL> additionalUrls, boolean firstTimeAdded, boolean register); + + ReportingTaskNode getReportingTaskNode(String taskId); + + void removeReportingTask(ReportingTaskNode reportingTask); + + Set<ReportingTaskNode> getAllReportingTasks(); + + + + Set<ControllerServiceNode> getAllControllerServices(); + + ControllerServiceNode getControllerServiceNode(String id); + + ControllerServiceNode createControllerService(String type, String id, BundleCoordinate bundleCoordinate, Set<URL> additionalUrls, boolean firstTimeAdded, + boolean registerLogObserver); + + + Set<ControllerServiceNode> getRootControllerServices(); + + void addRootControllerService(ControllerServiceNode serviceNode); + + ControllerServiceNode getRootControllerService(String serviceIdentifier); + + void removeRootControllerService(final ControllerServiceNode service); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index 95eb6a5..15033b9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -16,37 +16,26 @@ */ package org.apache.nifi.controller.service; -import java.net.URL; -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; - -import org.apache.nifi.annotation.lifecycle.OnAdded; -import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.nar.ExtensionManager; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + /** * */ public interface ControllerServiceProvider extends ControllerServiceLookup { /** - * Creates a new Controller Service of the specified type and assigns it the - * given id. If <code>firstTimeadded</code> is true, calls any methods that - * are annotated with {@link OnAdded} - * - * @param type of service - * @param id of service - * @param bundleCoordinate the coordinate of the bundle for the service - * @param additionalUrls optional additional URL resources to add to the class loader of the component - * @param firstTimeAdded for service - * @return the service node + * Notifies the ControllerServiceProvider that the given Controller Service has been added to the flow + * @param serviceNode the Controller Service Node */ - ControllerServiceNode createControllerService(String type, String id, BundleCoordinate bundleCoordinate, Set<URL> additionalUrls, boolean firstTimeAdded); + void onControllerServiceAdded(ControllerServiceNode serviceNode); /** * @param id of the service @@ -114,10 +103,9 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { Future<Void> disableControllerServicesAsync(Collection<ControllerServiceNode> serviceNodes); /** - * @return a Set of all Controller Services that exist for this service - * provider + * @return a Set of all Controller Services that exist for this service provider */ - Set<ControllerServiceNode> getAllControllerServices(); + Collection<ControllerServiceNode> getNonRootControllerServices(); /** * Verifies that all running Processors and Reporting Tasks referencing the http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java index 530b6ef..3b3a072 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java @@ -16,12 +16,12 @@ */ package org.apache.nifi.logging; -import static java.util.Objects.requireNonNull; +import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; @SuppressWarnings("unchecked") public class LogRepositoryFactory { http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java new file mode 100644 index 0000000..32b2e01 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.repository.RepositoryStatusReport; +import org.apache.nifi.controller.status.ProcessGroupStatus; + +public interface UserAwareEventAccess extends EventAccess { + /** + * Returns the status for components in the specified group. This request is + * made by the specified user so the results will be filtered accordingly. + * + * @param groupId group id + * @param user user making request + * @return the component status + */ + ProcessGroupStatus getGroupStatus(String groupId, NiFiUser user, int recursiveStatusDepth); + + + /** + * Returns the status for the components in the specified group with the + * specified report. This request is made by the specified user so the + * results will be filtered accordingly. + * + * @param groupId group id + * @param statusReport report + * @param user user making request + * @return the component status + */ + ProcessGroupStatus getGroupStatus(String groupId, RepositoryStatusReport statusReport, NiFiUser user); + + /** + * Returns the status for components in the specified group. This request is + * made by the specified user so the results will be filtered accordingly. + * + * @param groupId group id + * @param user user making request + * @return the component status + */ + ProcessGroupStatus getGroupStatus(String groupId, NiFiUser user); + + /** + * Returns the status for the components in the specified group with the + * specified report. This request is made by the specified user so the + * results will be filtered accordingly. + * + * @param groupId group id + * @param statusReport report + * @param user user making request + * @param recursiveStatusDepth the number of levels deep we should recurse and still include the the processors' statuses, the groups' statuses, etc. in the returned ProcessGroupStatus + * @return the component status + */ + ProcessGroupStatus getGroupStatus(String groupId, RepositoryStatusReport statusReport, NiFiUser user, int recursiveStatusDepth); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java index 0665dd2..b49d52e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java @@ -18,18 +18,18 @@ package org.apache.nifi.components.validation; import org.apache.nifi.controller.ComponentNode; -import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.flow.FlowManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TriggerValidationTask implements Runnable { private static final Logger logger = LoggerFactory.getLogger(TriggerValidationTask.class); - private final FlowController controller; + private final FlowManager flowManager; private final ValidationTrigger validationTrigger; - public TriggerValidationTask(final FlowController controller, final ValidationTrigger validationTrigger) { - this.controller = controller; + public TriggerValidationTask(final FlowManager flowManager, final ValidationTrigger validationTrigger) { + this.flowManager = flowManager; this.validationTrigger = validationTrigger; } @@ -38,15 +38,15 @@ public class TriggerValidationTask implements Runnable { try { logger.debug("Triggering validation of all components"); - for (final ComponentNode node : controller.getAllControllerServices()) { + for (final ComponentNode node : flowManager.getAllControllerServices()) { validationTrigger.trigger(node); } - for (final ComponentNode node : controller.getAllReportingTasks()) { + for (final ComponentNode node : flowManager.getAllReportingTasks()) { validationTrigger.trigger(node); } - for (final ComponentNode node : controller.getRootGroup().findAllProcessors()) { + for (final ComponentNode node : flowManager.getRootGroup().findAllProcessors()) { validationTrigger.trigger(node); } } catch (final Throwable t) { http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java new file mode 100644 index 0000000..e897dd2 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.exception.ProcessorInstantiationException; +import org.apache.nifi.controller.kerberos.KerberosConfig; +import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; +import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; +import org.apache.nifi.controller.reporting.StandardReportingTaskNode; +import org.apache.nifi.controller.service.ControllerServiceInvocationHandler; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.GhostControllerService; +import org.apache.nifi.controller.service.StandardControllerServiceInitializationContext; +import org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler; +import org.apache.nifi.controller.service.StandardControllerServiceNode; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.processor.GhostProcessor; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.processor.StandardProcessorInitializationContext; +import org.apache.nifi.processor.StandardValidationContextFactory; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; +import org.apache.nifi.reporting.GhostReportingTask; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.reporting.ReportingTask; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Proxy; +import java.net.URL; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ExtensionBuilder { + private static final Logger logger = LoggerFactory.getLogger(ExtensionBuilder.class); + + private String type; + private String identifier; + private BundleCoordinate bundleCoordinate; + private ExtensionManager extensionManager; + private Set<URL> classpathUrls; + private KerberosConfig kerberosConfig = KerberosConfig.NOT_CONFIGURED; + private ControllerServiceProvider serviceProvider; + private NodeTypeProvider nodeTypeProvider; + private VariableRegistry variableRegistry; + private ProcessScheduler processScheduler; + private ValidationTrigger validationTrigger; + private ReloadComponent reloadComponent; + private FlowController flowController; + private StateManagerProvider stateManagerProvider; + + public ExtensionBuilder type(final String type) { + this.type = type; + return this; + } + + public ExtensionBuilder identifier(final String identifier) { + this.identifier = identifier; + return this; + } + + public ExtensionBuilder bundleCoordinate(final BundleCoordinate coordinate) { + this.bundleCoordinate = coordinate; + return this; + } + + public ExtensionBuilder addClasspathUrls(final Set<URL> urls) { + if (urls == null || urls.isEmpty()) { + return this; + } + + if (this.classpathUrls == null) { + this.classpathUrls = new HashSet<>(); + } + + this.classpathUrls.addAll(urls); + return this; + } + + public ExtensionBuilder kerberosConfig(final KerberosConfig kerberosConfig) { + this.kerberosConfig = kerberosConfig; + return this; + } + + public ExtensionBuilder controllerServiceProvider(final ControllerServiceProvider serviceProvider) { + this.serviceProvider = serviceProvider; + return this; + } + + public ExtensionBuilder nodeTypeProvider(final NodeTypeProvider nodeTypeProvider) { + this.nodeTypeProvider = nodeTypeProvider; + return this; + } + + public ExtensionBuilder variableRegistry(final VariableRegistry variableRegistry) { + this.variableRegistry = variableRegistry; + return this; + } + + public ExtensionBuilder processScheduler(final ProcessScheduler scheduler) { + this.processScheduler = scheduler; + return this; + } + + public ExtensionBuilder validationTrigger(final ValidationTrigger validationTrigger) { + this.validationTrigger = validationTrigger; + return this; + } + + public ExtensionBuilder reloadComponent(final ReloadComponent reloadComponent) { + this.reloadComponent = reloadComponent; + return this; + } + + public ExtensionBuilder flowController(final FlowController flowController) { + this.flowController = flowController; + return this; + } + + public ExtensionBuilder stateManagerProvider(final StateManagerProvider stateManagerProvider) { + this.stateManagerProvider = stateManagerProvider; + return this; + } + + public ExtensionBuilder extensionManager(final ExtensionManager extensionManager) { + this.extensionManager = extensionManager; + return this; + } + + public ProcessorNode buildProcessor() { + if (identifier == null) { + throw new IllegalStateException("Processor ID must be specified"); + } + if (type == null) { + throw new IllegalStateException("Processor Type must be specified"); + } + if (bundleCoordinate == null) { + throw new IllegalStateException("Bundle Coordinate must be specified"); + } + if (extensionManager == null) { + throw new IllegalStateException("Extension Manager must be specified"); + } + if (serviceProvider == null) { + throw new IllegalStateException("Controller Service Provider must be specified"); + } + if (nodeTypeProvider == null) { + throw new IllegalStateException("Node Type Provider must be specified"); + } + if (variableRegistry == null) { + throw new IllegalStateException("Variable Registry must be specified"); + } + if (reloadComponent == null) { + throw new IllegalStateException("Reload Component must be specified"); + } + + boolean creationSuccessful = true; + LoggableComponent<Processor> loggableComponent; + try { + loggableComponent = createLoggableProcessor(); + } catch (final ProcessorInstantiationException pie) { + logger.error("Could not create Processor of type " + type + " for ID " + identifier + "; creating \"Ghost\" implementation", pie); + final GhostProcessor ghostProc = new GhostProcessor(); + ghostProc.setIdentifier(identifier); + ghostProc.setCanonicalClassName(type); + loggableComponent = new LoggableComponent<>(ghostProc, bundleCoordinate, null); + creationSuccessful = false; + } + + final ProcessorNode processorNode = createProcessorNode(loggableComponent, creationSuccessful); + return processorNode; + } + + public ReportingTaskNode buildReportingTask() { + if (identifier == null) { + throw new IllegalStateException("ReportingTask ID must be specified"); + } + if (type == null) { + throw new IllegalStateException("ReportingTask Type must be specified"); + } + if (bundleCoordinate == null) { + throw new IllegalStateException("Bundle Coordinate must be specified"); + } + if (extensionManager == null) { + throw new IllegalStateException("Extension Manager must be specified"); + } + if (serviceProvider == null) { + throw new IllegalStateException("Controller Service Provider must be specified"); + } + if (nodeTypeProvider == null) { + throw new IllegalStateException("Node Type Provider must be specified"); + } + if (variableRegistry == null) { + throw new IllegalStateException("Variable Registry must be specified"); + } + if (reloadComponent == null) { + throw new IllegalStateException("Reload Component must be specified"); + } + if (flowController == null) { + throw new IllegalStateException("FlowController must be specified"); + } + + boolean creationSuccessful = true; + LoggableComponent<ReportingTask> loggableComponent; + try { + loggableComponent = createLoggableReportingTask(); + } catch (final ReportingTaskInstantiationException rtie) { + logger.error("Could not create ReportingTask of type " + type + " for ID " + identifier + "; creating \"Ghost\" implementation", rtie); + final GhostReportingTask ghostReportingTask = new GhostReportingTask(); + ghostReportingTask.setIdentifier(identifier); + ghostReportingTask.setCanonicalClassName(type); + loggableComponent = new LoggableComponent<>(ghostReportingTask, bundleCoordinate, null); + creationSuccessful = false; + } + + final ReportingTaskNode taskNode = createReportingTaskNode(loggableComponent, creationSuccessful); + return taskNode; + } + + public ControllerServiceNode buildControllerService() { + if (identifier == null) { + throw new IllegalStateException("ReportingTask ID must be specified"); + } + if (type == null) { + throw new IllegalStateException("ReportingTask Type must be specified"); + } + if (bundleCoordinate == null) { + throw new IllegalStateException("Bundle Coordinate must be specified"); + } + if (extensionManager == null) { + throw new IllegalStateException("Extension Manager must be specified"); + } + if (serviceProvider == null) { + throw new IllegalStateException("Controller Service Provider must be specified"); + } + if (nodeTypeProvider == null) { + throw new IllegalStateException("Node Type Provider must be specified"); + } + if (variableRegistry == null) { + throw new IllegalStateException("Variable Registry must be specified"); + } + if (reloadComponent == null) { + throw new IllegalStateException("Reload Component must be specified"); + } + if (stateManagerProvider == null) { + throw new IllegalStateException("State Manager Provider must be specified"); + } + + try { + return createControllerServiceNode(); + } catch (final Exception e) { + logger.error("Could not create Controller Service of type " + type + " for ID " + identifier + "; creating \"Ghost\" implementation", e); + return createGhostControllerServiceNode(); + } + } + + + private ProcessorNode createProcessorNode(final LoggableComponent<Processor> processor, final boolean creationSuccessful) { + final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, componentVarRegistry); + + final ProcessorNode procNode; + if (creationSuccessful) { + procNode = new StandardProcessorNode(processor, identifier, validationContextFactory, processScheduler, serviceProvider, + componentVarRegistry, reloadComponent, extensionManager, validationTrigger); + } else { + final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; + final String componentType = "(Missing) " + simpleClassName; + procNode = new StandardProcessorNode(processor, identifier, validationContextFactory, processScheduler, serviceProvider, + componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true); + } + + applyDefaultSettings(procNode); + return procNode; + } + + + private ReportingTaskNode createReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final boolean creationSuccessful) { + final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, componentVarRegistry); + final ReportingTaskNode taskNode; + if (creationSuccessful) { + taskNode = new StandardReportingTaskNode(reportingTask, identifier, flowController, processScheduler, + validationContextFactory, componentVarRegistry, reloadComponent, extensionManager, validationTrigger); + taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName()); + } else { + final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; + final String componentType = "(Missing) " + simpleClassName; + + taskNode = new StandardReportingTaskNode(reportingTask, identifier, flowController, processScheduler, validationContextFactory, + componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true); + taskNode.setName(componentType); + } + + return taskNode; + } + + private void applyDefaultSettings(final ProcessorNode processorNode) { + try { + final Class<?> procClass = processorNode.getProcessor().getClass(); + + final DefaultSettings ds = procClass.getAnnotation(DefaultSettings.class); + if (ds != null) { + processorNode.setYieldPeriod(ds.yieldDuration()); + processorNode.setPenalizationPeriod(ds.penaltyDuration()); + processorNode.setBulletinLevel(ds.bulletinLevel()); + } + } catch (final Exception ex) { + logger.error("Error while setting default settings from DefaultSettings annotation: {}", ex.toString(), ex); + } + } + + private ControllerServiceNode createControllerServiceNode() throws ClassNotFoundException, IllegalAccessException, InstantiationException, InitializationException { + final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final Bundle bundle = extensionManager.getBundle(bundleCoordinate); + if (bundle == null) { + throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate()); + } + + final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle, classpathUrls == null ? Collections.emptySet() : classpathUrls); + final Class<?> rawClass = Class.forName(type, true, detectedClassLoader); + Thread.currentThread().setContextClassLoader(detectedClassLoader); + + final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class); + final ControllerService serviceImpl = controllerServiceClass.newInstance(); + final StandardControllerServiceInvocationHandler invocationHandler = new StandardControllerServiceInvocationHandler(extensionManager, serviceImpl); + + // extract all interfaces... controllerServiceClass is non null so getAllInterfaces is non null + final List<Class<?>> interfaceList = ClassUtils.getAllInterfaces(controllerServiceClass); + final Class<?>[] interfaces = interfaceList.toArray(new Class<?>[0]); + + final ControllerService proxiedService; + if (detectedClassLoader == null) { + proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), interfaces, invocationHandler); + } else { + proxiedService = (ControllerService) Proxy.newProxyInstance(detectedClassLoader, interfaces, invocationHandler); + } + + logger.info("Created Controller Service of type {} with identifier {}", type, identifier); + final ComponentLog serviceLogger = new SimpleProcessLogger(identifier, serviceImpl); + final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(serviceLogger); + + final StateManager stateManager = stateManagerProvider.getStateManager(identifier); + final ControllerServiceInitializationContext initContext = new StandardControllerServiceInitializationContext(identifier, terminationAwareLogger, + serviceProvider, stateManager, kerberosConfig); + serviceImpl.initialize(initContext); + + final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(serviceImpl, bundleCoordinate, terminationAwareLogger); + final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, terminationAwareLogger); + + final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, componentVarRegistry); + final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler, + identifier, validationContextFactory, serviceProvider, componentVarRegistry, reloadComponent, extensionManager, validationTrigger); + serviceNode.setName(rawClass.getSimpleName()); + + invocationHandler.setServiceNode(serviceNode); + return serviceNode; + } finally { + if (ctxClassLoader != null) { + Thread.currentThread().setContextClassLoader(ctxClassLoader); + } + } + } + + private ControllerServiceNode createGhostControllerServiceNode() { + final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; + final String componentType = "(Missing) " + simpleClassName; + + final GhostControllerService ghostService = new GhostControllerService(identifier, type); + final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(ghostService, bundleCoordinate, null); + + final ControllerServiceInvocationHandler invocationHandler = new StandardControllerServiceInvocationHandler(extensionManager, ghostService); + + final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, variableRegistry); + final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, identifier, + validationContextFactory, serviceProvider, componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true); + + return serviceNode; + } + + private LoggableComponent<Processor> createLoggableProcessor() throws ProcessorInstantiationException { + try { + final LoggableComponent<Processor> processorComponent = createLoggableComponent(Processor.class); + + final ProcessorInitializationContext initiContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(), + serviceProvider, nodeTypeProvider, kerberosConfig); + processorComponent.getComponent().initialize(initiContext); + + return processorComponent; + } catch (final Exception e) { + throw new ProcessorInstantiationException(type, e); + } + } + + + private LoggableComponent<ReportingTask> createLoggableReportingTask() throws ReportingTaskInstantiationException { + try { + final LoggableComponent<ReportingTask> taskComponent = createLoggableComponent(ReportingTask.class); + + final String taskName = taskComponent.getComponent().getClass().getSimpleName(); + final ReportingInitializationContext config = new StandardReportingInitializationContext(identifier, taskName, + SchedulingStrategy.TIMER_DRIVEN, "1 min", taskComponent.getLogger(), serviceProvider, kerberosConfig, nodeTypeProvider); + + taskComponent.getComponent().initialize(config); + + return taskComponent; + } catch (final Exception e) { + throw new ReportingTaskInstantiationException(type, e); + } + } + + private <T extends ConfigurableComponent> LoggableComponent<T> createLoggableComponent(Class<T> nodeType) throws ClassNotFoundException, IllegalAccessException, InstantiationException { + final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final Bundle bundle = extensionManager.getBundle(bundleCoordinate); + if (bundle == null) { + throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate()); + } + + final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle, classpathUrls == null ? Collections.emptySet() : classpathUrls); + final Class<?> rawClass = Class.forName(type, true, detectedClassLoader); + Thread.currentThread().setContextClassLoader(detectedClassLoader); + + final Object extensionInstance = rawClass.newInstance(); + final ComponentLog componentLog = new SimpleProcessLogger(identifier, extensionInstance); + final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLog); + + final T cast = nodeType.cast(extensionInstance); + return new LoggableComponent<>(cast, bundleCoordinate, terminationAwareLogger); + } finally { + if (ctxClassLoader != null) { + Thread.currentThread().setContextClassLoader(ctxClassLoader); + } + } + } +}