http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java
new file mode 100644
index 0000000..fed09af
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.nar.NarCloseable;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StandardControllerServiceInvocationHandler implements 
ControllerServiceInvocationHandler {
+
+    private static final Set<Method> validDisabledMethods;
+    static {
+        // methods that are okay to be called when the service is disabled.
+        final Set<Method> validMethods = new HashSet<>();
+        for (final Method method : ControllerService.class.getMethods()) {
+            validMethods.add(method);
+        }
+        for (final Method method : Object.class.getMethods()) {
+            validMethods.add(method);
+        }
+        validDisabledMethods = Collections.unmodifiableSet(validMethods);
+    }
+
+    private final ControllerService originalService;
+    private final AtomicReference<ControllerServiceNode> serviceNodeHolder = 
new AtomicReference<>(null);
+
+    /**
+     * @param originalService the original service being proxied
+     */
+    public StandardControllerServiceInvocationHandler(final ControllerService 
originalService) {
+        this(originalService, null);
+    }
+
+    /**
+     * @param originalService the original service being proxied
+     * @param serviceNode the node holding the original service which will be 
used for checking the state (disabled vs running)
+     */
+    public StandardControllerServiceInvocationHandler(final ControllerService 
originalService, final ControllerServiceNode serviceNode) {
+        this.originalService = originalService;
+        this.serviceNodeHolder.set(serviceNode);
+    }
+
+    public void setServiceNode(final ControllerServiceNode serviceNode) {
+        this.serviceNodeHolder.set(serviceNode);
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        final String methodName = method.getName();
+        if ("initialize".equals(methodName) || 
"onPropertyModified".equals(methodName)) {
+            throw new UnsupportedOperationException(method + " may only be 
invoked by the NiFi framework");
+        }
+
+        final ControllerServiceNode node = serviceNodeHolder.get();
+        final ControllerServiceState state = node.getState();
+        final boolean disabled = state != ControllerServiceState.ENABLED; // 
only allow method call if service state is ENABLED.
+        if (disabled && !validDisabledMethods.contains(method)) {
+            // Use nar class loader here because we are implicitly calling 
toString() on the original implementation.
+            try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(originalService.getClass(), 
originalService.getIdentifier())) {
+                throw new IllegalStateException("Cannot invoke method " + 
method + " on Controller Service " + originalService.getIdentifier()
+                        + " because the Controller Service is disabled");
+            } catch (final Throwable e) {
+                throw new IllegalStateException("Cannot invoke method " + 
method + " on Controller Service with identifier "
+                        + originalService.getIdentifier() + " because the 
Controller Service is disabled");
+            }
+        }
+
+        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(originalService.getClass(), 
originalService.getIdentifier())) {
+            return method.invoke(originalService, args);
+        } catch (final InvocationTargetException e) {
+            // If the ControllerService throws an Exception, it'll be wrapped 
in an InvocationTargetException. We want
+            // to instead re-throw what the ControllerService threw, so we 
pull it out of the InvocationTargetException.
+            throw e.getCause();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 0a64253..a543040 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -24,12 +24,15 @@ import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractConfiguredComponent;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.LoggableComponent;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
@@ -60,8 +63,7 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StandardControllerServiceNode.class);
 
-    private final ControllerService proxedControllerService;
-    private final ControllerService implementation;
+    private final AtomicReference<ControllerServiceDetails> 
controllerServiceHolder = new AtomicReference<>(null);
     private final ControllerServiceProvider serviceProvider;
     private final AtomicReference<ControllerServiceState> stateRef = new 
AtomicReference<>(ControllerServiceState.DISABLED);
 
@@ -75,28 +77,42 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
 
     private final AtomicBoolean active;
 
-    public StandardControllerServiceNode(final ControllerService 
proxiedControllerService, final ControllerService implementation, final String 
id,
-                                         final ValidationContextFactory 
validationContextFactory, final ControllerServiceProvider serviceProvider,
-                                         final VariableRegistry 
variableRegistry, final ComponentLog logger) {
+    public StandardControllerServiceNode(final 
LoggableComponent<ControllerService> implementation, final 
LoggableComponent<ControllerService> proxiedControllerService,
+                                         final 
ControllerServiceInvocationHandler invocationHandler, final String id, final 
ValidationContextFactory validationContextFactory,
+                                         final ControllerServiceProvider 
serviceProvider, final VariableRegistry variableRegistry) {
 
-        this(proxiedControllerService, implementation, id, 
validationContextFactory, serviceProvider,
-            implementation.getClass().getSimpleName(), 
implementation.getClass().getCanonicalName(), variableRegistry, logger);
+        this(implementation, proxiedControllerService, invocationHandler, id, 
validationContextFactory, serviceProvider,
+            implementation.getComponent().getClass().getSimpleName(), 
implementation.getComponent().getClass().getCanonicalName(), variableRegistry, 
false);
     }
 
-    public StandardControllerServiceNode(final ControllerService 
proxiedControllerService, final ControllerService implementation, final String 
id,
-                                         final ValidationContextFactory 
validationContextFactory, final ControllerServiceProvider serviceProvider,
-                                         final String componentType, final 
String componentCanonicalClass, final VariableRegistry variableRegistry,
-                                         final ComponentLog logger) {
+    public StandardControllerServiceNode(final 
LoggableComponent<ControllerService> implementation, final 
LoggableComponent<ControllerService> proxiedControllerService,
+                                         final 
ControllerServiceInvocationHandler invocationHandler, final String id, final 
ValidationContextFactory validationContextFactory,
+                                         final ControllerServiceProvider 
serviceProvider, final String componentType, final String 
componentCanonicalClass,
+                                         final VariableRegistry 
variableRegistry, final boolean isExtensionMissing) {
 
-        super(implementation, id, validationContextFactory, serviceProvider, 
componentType, componentCanonicalClass, variableRegistry, logger);
-        this.proxedControllerService = proxiedControllerService;
-        this.implementation = implementation;
+        super(id, validationContextFactory, serviceProvider, componentType, 
componentCanonicalClass, variableRegistry, isExtensionMissing);
         this.serviceProvider = serviceProvider;
         this.active = new AtomicBoolean();
+        setControllerServiceAndProxy(implementation, proxiedControllerService, 
invocationHandler);
 
     }
 
     @Override
+    public ConfigurableComponent getComponent() {
+        return controllerServiceHolder.get().getImplementation();
+    }
+
+    @Override
+    public ComponentLog getLogger() {
+        return controllerServiceHolder.get().getComponentLog();
+    }
+
+    @Override
+    public BundleCoordinate getBundleCoordinate() {
+        return controllerServiceHolder.get().getBundleCoordinate();
+    }
+
+    @Override
     public Authorizable getParentAuthorizable() {
         final ProcessGroup processGroup = getProcessGroup();
         if (processGroup == null) {
@@ -127,13 +143,32 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
     }
 
     @Override
+    public ControllerService getControllerServiceImplementation() {
+        return controllerServiceHolder.get().getImplementation();
+    }
+
+    @Override
     public ControllerService getProxiedControllerService() {
-        return proxedControllerService;
+        return controllerServiceHolder.get().getProxiedControllerService();
     }
 
     @Override
-    public ControllerService getControllerServiceImplementation() {
-        return implementation;
+    public ControllerServiceInvocationHandler getInvocationHandler() {
+        return controllerServiceHolder.get().getInvocationHandler();
+    }
+
+    @Override
+    public void setControllerServiceAndProxy(final 
LoggableComponent<ControllerService> implementation,
+                                             final 
LoggableComponent<ControllerService> proxiedControllerService,
+                                             final 
ControllerServiceInvocationHandler invocationHandler) {
+        synchronized (this.active) {
+            if (isActive()) {
+                throw new IllegalStateException("Cannot modify Controller 
Service configuration while service is active");
+            }
+
+            final ControllerServiceDetails controllerServiceDetails = new 
ControllerServiceDetails(implementation, proxiedControllerService, 
invocationHandler);
+            this.controllerServiceHolder.set(controllerServiceDetails);
+        }
     }
 
     @Override
@@ -211,7 +246,7 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
     @Override
     public void verifyCanDelete() {
         if (getState() != ControllerServiceState.DISABLED) {
-            throw new IllegalStateException("Controller Service " + 
implementation.getIdentifier() + " cannot be deleted because it is not 
disabled");
+            throw new IllegalStateException("Controller Service " + 
getControllerServiceImplementation().getIdentifier() + " cannot be deleted 
because it is not disabled");
         }
     }
 
@@ -236,7 +271,7 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
         }
 
         if (!activeReferencesIdentifiers.isEmpty()) {
-            throw new IllegalStateException(implementation.getIdentifier() + " 
cannot be disabled because it is referenced by " + 
activeReferencesIdentifiers.size() +
+            throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be disabled because it is referenced by " + 
activeReferencesIdentifiers.size() +
                 " components that are currently running: [" + 
StringUtils.join(activeReferencesIdentifiers, ", ") + "]");
         }
     }
@@ -244,18 +279,18 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
     @Override
     public void verifyCanEnable() {
         if (getState() != ControllerServiceState.DISABLED) {
-            throw new IllegalStateException(implementation.getIdentifier() + " 
cannot be enabled because it is not disabled");
+            throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because it is not disabled");
         }
 
         if (!isValid()) {
-            throw new IllegalStateException(implementation.getIdentifier() + " 
cannot be enabled because it is not valid: " + getValidationErrors());
+            throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because it is not valid: " + getValidationErrors());
         }
     }
 
     @Override
     public void verifyCanEnable(final Set<ControllerServiceNode> 
ignoredReferences) {
         if (getState() != ControllerServiceState.DISABLED) {
-            throw new IllegalStateException(implementation.getIdentifier() + " 
cannot be enabled because it is not disabled");
+            throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because it is not disabled");
         }
 
         final Set<String> ids = new HashSet<>();
@@ -266,7 +301,7 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
         final Collection<ValidationResult> validationResults = 
getValidationErrors(ids);
         for (final ValidationResult result : validationResults) {
             if (!result.isValid()) {
-                throw new IllegalStateException(implementation.getIdentifier() 
+ " cannot be enabled because it is not valid: " + result);
+                throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because it is not valid: " + result);
             }
         }
     }
@@ -274,7 +309,7 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
     @Override
     public void verifyCanUpdate() {
         if (getState() != ControllerServiceState.DISABLED) {
-            throw new IllegalStateException(implementation.getIdentifier() + " 
cannot be updated because it is not disabled");
+            throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be updated because it is not disabled");
         }
     }
 
@@ -335,7 +370,10 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
     @Override
     public void enable(final ScheduledExecutorService scheduler, final long 
administrativeYieldMillis) {
         if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, 
ControllerServiceState.ENABLING)) {
-            this.active.set(true);
+            synchronized (active) {
+                this.active.set(true);
+            }
+
             final ConfigurationContext configContext = new 
StandardConfigurationContext(this, this.serviceProvider, null, 
getVariableRegistry());
             scheduler.execute(new Runnable() {
                 @Override
@@ -447,4 +485,5 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
         }
         return results != null ? results : Collections.emptySet();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index e4937df..cadc8e7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -16,24 +16,11 @@
  */
 package org.apache.nifi.controller.service;
 
-import static java.util.Objects.requireNonNull;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
+import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.StateManager;
@@ -41,6 +28,7 @@ import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.LoggableComponent;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
@@ -56,7 +44,6 @@ import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.registry.VariableRegistry;
-
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.NiFiProperties;
@@ -64,30 +51,30 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.Objects.requireNonNull;
+
 public class StandardControllerServiceProvider implements 
ControllerServiceProvider {
 
     private static final Logger logger = 
LoggerFactory.getLogger(StandardControllerServiceProvider.class);
 
     private final ProcessScheduler processScheduler;
-    private static final Set<Method> validDisabledMethods;
     private final BulletinRepository bulletinRepo;
     private final StateManagerProvider stateManagerProvider;
     private final VariableRegistry variableRegistry;
     private final FlowController flowController;
     private final NiFiProperties nifiProperties;
 
-    static {
-        // methods that are okay to be called when the service is disabled.
-        final Set<Method> validMethods = new HashSet<>();
-        for (final Method method : ControllerService.class.getMethods()) {
-            validMethods.add(method);
-        }
-        for (final Method method : Object.class.getMethods()) {
-            validMethods.add(method);
-        }
-        validDisabledMethods = Collections.unmodifiableSet(validMethods);
-    }
-
     public StandardControllerServiceProvider(final FlowController 
flowController, final ProcessScheduler scheduler, final BulletinRepository 
bulletinRepo,
             final StateManagerProvider stateManagerProvider, final 
VariableRegistry variableRegistry, final NiFiProperties nifiProperties) {
 
@@ -99,108 +86,66 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         this.nifiProperties = nifiProperties;
     }
 
-    private Class<?>[] getInterfaces(final Class<?> cls) {
-        final List<Class<?>> allIfcs = new ArrayList<>();
-        populateInterfaces(cls, allIfcs);
-        return allIfcs.toArray(new Class<?>[allIfcs.size()]);
-    }
-
-    private void populateInterfaces(final Class<?> cls, final List<Class<?>> 
interfacesDefinedThusFar) {
-        final Class<?>[] ifc = cls.getInterfaces();
-        if (ifc != null && ifc.length > 0) {
-            for (final Class<?> i : ifc) {
-                interfacesDefinedThusFar.add(i);
-            }
-        }
-
-        final Class<?> superClass = cls.getSuperclass();
-        if (superClass != null) {
-            populateInterfaces(superClass, interfacesDefinedThusFar);
-        }
-    }
-
     private StateManager getStateManager(final String componentId) {
         return stateManagerProvider.getStateManager(componentId);
     }
 
     @Override
-    public ControllerServiceNode createControllerService(final String type, 
final String id, final boolean firstTimeAdded) {
-        if (type == null || id == null) {
+    public ControllerServiceNode createControllerService(final String type, 
final String id, final BundleCoordinate bundleCoordinate, final boolean 
firstTimeAdded) {
+        if (type == null || id == null || bundleCoordinate == null) {
             throw new NullPointerException();
         }
 
+        ClassLoader cl = null;
         final ClassLoader currentContextClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
-            final ClassLoader cl = ExtensionManager.getClassLoader(type, id);
             final Class<?> rawClass;
-
             try {
-                if (cl == null) {
-                    rawClass = Class.forName(type);
-                } else {
-                    Thread.currentThread().setContextClassLoader(cl);
-                    rawClass = Class.forName(type, false, cl);
+                final Bundle csBundle = 
ExtensionManager.getBundle(bundleCoordinate);
+                if (csBundle == null) {
+                    throw new ControllerServiceInstantiationException("Unable 
to find bundle for coordinate " + bundleCoordinate.getCoordinate());
                 }
+
+                cl = ExtensionManager.createInstanceClassLoader(type, id, 
csBundle);
+                Thread.currentThread().setContextClassLoader(cl);
+                rawClass = Class.forName(type, false, cl);
             } catch (final Exception e) {
                 logger.error("Could not create Controller Service of type " + 
type + " for ID " + id + "; creating \"Ghost\" implementation", e);
                 
Thread.currentThread().setContextClassLoader(currentContextClassLoader);
-                return createGhostControllerService(type, id);
+                return createGhostControllerService(type, id, 
bundleCoordinate);
             }
 
             final Class<? extends ControllerService> controllerServiceClass = 
rawClass.asSubclass(ControllerService.class);
 
             final ControllerService originalService = 
controllerServiceClass.newInstance();
-            final AtomicReference<ControllerServiceNode> serviceNodeHolder = 
new AtomicReference<>(null);
-            final InvocationHandler invocationHandler = new 
InvocationHandler() {
-                @Override
-                public Object invoke(final Object proxy, final Method method, 
final Object[] args) throws Throwable {
-
-                    final String methodName = method.getName();
-                    if ("initialize".equals(methodName) || 
"onPropertyModified".equals(methodName)) {
-                        throw new UnsupportedOperationException(method + " may 
only be invoked by the NiFi framework");
-                    }
-
-                    final ControllerServiceNode node = serviceNodeHolder.get();
-                    final ControllerServiceState state = node.getState();
-                    final boolean disabled = state != 
ControllerServiceState.ENABLED; // only allow method call if service state is 
ENABLED.
-                    if (disabled && !validDisabledMethods.contains(method)) {
-                        // Use nar class loader here because we are implicitly 
calling toString() on the original implementation.
-                        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(originalService.getClass(), 
originalService.getIdentifier())) {
-                            throw new IllegalStateException("Cannot invoke 
method " + method + " on Controller Service " + originalService.getIdentifier()
-                                    + " because the Controller Service is 
disabled");
-                        } catch (final Throwable e) {
-                            throw new IllegalStateException("Cannot invoke 
method " + method + " on Controller Service with identifier " + id + " because 
the Controller Service is disabled");
-                        }
-                    }
+            final StandardControllerServiceInvocationHandler invocationHandler 
= new StandardControllerServiceInvocationHandler(originalService);
 
-                    try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(originalService.getClass(), 
originalService.getIdentifier())) {
-                        return method.invoke(originalService, args);
-                    } catch (final InvocationTargetException e) {
-                        // If the ControllerService throws an Exception, it'll 
be wrapped in an InvocationTargetException. We want
-                        // to instead re-throw what the ControllerService 
threw, so we pull it out of the InvocationTargetException.
-                        throw e.getCause();
-                    }
-                }
-            };
+            // extract all interfaces... controllerServiceClass is non null so 
getAllInterfaces is non null
+            final List<Class<?>> interfaceList = 
ClassUtils.getAllInterfaces(controllerServiceClass);
+            final Class<?>[] interfaces = interfaceList.toArray(new 
Class<?>[interfaceList.size()]);
 
             final ControllerService proxiedService;
             if (cl == null) {
-                proxiedService = (ControllerService) 
Proxy.newProxyInstance(getClass().getClassLoader(), 
getInterfaces(controllerServiceClass), invocationHandler);
+                proxiedService = (ControllerService) 
Proxy.newProxyInstance(getClass().getClassLoader(), interfaces, 
invocationHandler);
             } else {
-                proxiedService = (ControllerService) 
Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), 
invocationHandler);
+                proxiedService = (ControllerService) 
Proxy.newProxyInstance(cl, interfaces, invocationHandler);
             }
             logger.info("Created Controller Service of type {} with identifier 
{}", type, id);
 
             final ComponentLog serviceLogger = new SimpleProcessLogger(id, 
originalService);
             originalService.initialize(new 
StandardControllerServiceInitializationContext(id, serviceLogger, this, 
getStateManager(id), nifiProperties));
 
-            final ComponentLog logger = new SimpleProcessLogger(id, 
originalService);
             final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(this, variableRegistry);
 
-            final ControllerServiceNode serviceNode = new 
StandardControllerServiceNode(proxiedService, originalService, id, 
validationContextFactory, this, variableRegistry, logger);
-            serviceNodeHolder.set(serviceNode);
+            final LoggableComponent<ControllerService> 
originalLoggableComponent = new LoggableComponent<>(originalService, 
bundleCoordinate, serviceLogger);
+            final LoggableComponent<ControllerService> 
proxiedLoggableComponent = new LoggableComponent<>(proxiedService, 
bundleCoordinate, serviceLogger);
+
+            final ControllerServiceNode serviceNode = new 
StandardControllerServiceNode(originalLoggableComponent, 
proxiedLoggableComponent, invocationHandler,
+                    id, validationContextFactory, this, variableRegistry);
             serviceNode.setName(rawClass.getSimpleName());
 
+            invocationHandler.setServiceNode(serviceNode);
+
             if (firstTimeAdded) {
                 try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(originalService.getClass(), 
originalService.getIdentifier())) {
                     ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
originalService);
@@ -219,8 +164,8 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         }
     }
 
-    private ControllerServiceNode createGhostControllerService(final String 
type, final String id) {
-        final InvocationHandler invocationHandler = new InvocationHandler() {
+    private ControllerServiceNode createGhostControllerService(final String 
type, final String id, final BundleCoordinate bundleCoordinate) {
+        final ControllerServiceInvocationHandler invocationHandler = new 
ControllerServiceInvocationHandler() {
             @Override
             public Object invoke(final Object proxy, final Method method, 
final Object[] args) throws Throwable {
                 final String methodName = method.getName();
@@ -257,6 +202,10 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
                     throw new IllegalStateException("Controller Service could 
not be created because the Controller Service Type (" + type + ") could not be 
found");
                 }
             }
+            @Override
+            public void setServiceNode(ControllerServiceNode serviceNode) {
+                // nothing to do
+            }
         };
 
         final ControllerService proxiedService = (ControllerService) 
Proxy.newProxyInstance(getClass().getClassLoader(),
@@ -265,10 +214,10 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         final String simpleClassName = type.contains(".") ? 
StringUtils.substringAfterLast(type, ".") : type;
         final String componentType = "(Missing) " + simpleClassName;
 
-        final ComponentLog logger = new SimpleProcessLogger(id, 
proxiedService);
+        final LoggableComponent<ControllerService> proxiedLoggableComponent = 
new LoggableComponent<>(proxiedService, bundleCoordinate, null);
 
-        final ControllerServiceNode serviceNode = new 
StandardControllerServiceNode(proxiedService, proxiedService, id,
-                new StandardValidationContextFactory(this, variableRegistry), 
this, componentType, type, variableRegistry, logger);
+        final ControllerServiceNode serviceNode = new 
StandardControllerServiceNode(proxiedLoggableComponent, 
proxiedLoggableComponent, invocationHandler, id,
+                new StandardValidationContextFactory(this, variableRegistry), 
this, componentType, type, variableRegistry, true);
         return serviceNode;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
index c9e7b8e..d86a120 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
@@ -26,16 +26,19 @@ import javax.net.ssl.SSLContext;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.logging.ComponentLog;
 
 public class StandardStateProviderInitializationContext implements 
StateProviderInitializationContext {
     private final String id;
     private final Map<PropertyDescriptor, PropertyValue> properties;
     private final SSLContext sslContext;
+    private final ComponentLog logger;
 
-    public StandardStateProviderInitializationContext(final String identifier, 
final Map<PropertyDescriptor, PropertyValue> properties, final SSLContext 
sslContext) {
+    public StandardStateProviderInitializationContext(final String identifier, 
final Map<PropertyDescriptor, PropertyValue> properties, final SSLContext 
sslContext, final ComponentLog logger) {
         this.id = identifier;
         this.properties = new HashMap<>(properties);
         this.sslContext = sslContext;
+        this.logger = logger;
     }
 
     @Override
@@ -57,4 +60,9 @@ public class StandardStateProviderInitializationContext 
implements StateProvider
     public SSLContext getSSLContext() {
         return sslContext;
     }
+
+    @Override
+    public ComponentLog getLogger() {
+        return logger;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
index 21368e8..d63ae00 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,6 +30,7 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
@@ -44,7 +46,9 @@ import 
org.apache.nifi.controller.state.StandardStateProviderInitializationConte
 import org.apache.nifi.controller.state.config.StateManagerConfiguration;
 import org.apache.nifi.controller.state.config.StateProviderConfiguration;
 import org.apache.nifi.framework.security.util.SslContextFactory;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardValidationContext;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.util.NiFiProperties;
@@ -183,7 +187,8 @@ public class StandardStateManagerProvider implements 
StateManagerProvider{
         }
 
         final SSLContext sslContext = 
SslContextFactory.createSslContext(properties, false);
-        final StateProviderInitializationContext initContext = new 
StandardStateProviderInitializationContext(providerId, propertyMap, sslContext);
+        final ComponentLog logger = new SimpleProcessLogger(providerId, 
provider);
+        final StateProviderInitializationContext initContext = new 
StandardStateProviderInitializationContext(providerId, propertyMap, sslContext, 
logger);
 
         synchronized (provider) {
             provider.initialize(initContext);
@@ -213,15 +218,17 @@ public class StandardStateManagerProvider implements 
StateManagerProvider{
     private static StateProvider instantiateStateProvider(final String type) 
throws ClassNotFoundException, InstantiationException, IllegalAccessException {
         final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
-            final ClassLoader detectedClassLoaderForType = 
ExtensionManager.getClassLoader(type);
-            final Class<?> rawClass;
-            if (detectedClassLoaderForType == null) {
-                // try to find from the current class loader
-                rawClass = Class.forName(type);
-            } else {
-                // try to find from the registered classloader for that type
-                rawClass = Class.forName(type, true, 
ExtensionManager.getClassLoader(type));
+            final List<Bundle> bundles = ExtensionManager.getBundles(type);
+            if (bundles.size() == 0) {
+                throw new IllegalStateException(String.format("The specified 
class '%s' is not known to this nifi.", type));
             }
+            if (bundles.size() > 1) {
+                throw new IllegalStateException(String.format("Multiple 
bundles found for the specified class '%s', only one is allowed.", type));
+            }
+
+            final Bundle bundle = bundles.get(0);
+            final ClassLoader detectedClassLoaderForType = 
bundle.getClassLoader();
+            final Class<?> rawClass = Class.forName(type, true, 
detectedClassLoaderForType);
 
             
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
             final Class<? extends StateProvider> mgrClass = 
rawClass.asSubclass(StateProvider.class);

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index 0e01349..8d56140 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.fingerprint;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ControllerService;
@@ -27,7 +28,9 @@ import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.util.DomUtils;
+import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.slf4j.Logger;
@@ -337,13 +340,18 @@ public class FingerprintFactory {
         // annotation data
         appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(processorElem, "annotationData"));
 
+        // get the bundle details if possible
+        final BundleDTO bundle = 
FlowFromDOMFactory.getBundle(DomUtils.getChild(processorElem, "bundle"));
+        addBundleFingerprint(builder, bundle);
+
         // create an instance of the Processor so that we know the default 
property values
         Processor processor = null;
         try {
             if (controller != null) {
-                processor = controller.createProcessor(className, 
UUID.randomUUID().toString(), false).getProcessor();
+                final BundleCoordinate coordinate = getCoordinate(className, 
bundle);
+                processor = controller.createProcessor(className, 
UUID.randomUUID().toString(), coordinate, false).getProcessor();
             }
-        } catch (ProcessorInstantiationException e) {
+        } catch (ProcessorInstantiationException | IllegalStateException e) {
             logger.warn("Unable to create Processor of type {} due to {}; its 
default properties will be fingerprinted instead of being ignored.", className, 
e.toString());
             if (logger.isDebugEnabled()) {
                 logger.warn("", e);
@@ -565,6 +573,9 @@ public class FingerprintFactory {
         builder.append(dto.getId());
         builder.append(dto.getType());
         builder.append(dto.getName());
+
+        addBundleFingerprint(builder, dto.getBundle());
+
         builder.append(dto.getComments());
         builder.append(dto.getAnnotationData());
         builder.append(dto.getState());
@@ -573,7 +584,8 @@ public class FingerprintFactory {
         ControllerService controllerService = null;
         try {
             if (controller != null) {
-                controllerService = 
controller.createControllerService(dto.getType(), UUID.randomUUID().toString(), 
false).getControllerServiceImplementation();
+                final BundleCoordinate coordinate = 
getCoordinate(dto.getType(), dto.getBundle());
+                controllerService = 
controller.createControllerService(dto.getType(), UUID.randomUUID().toString(), 
coordinate, false).getControllerServiceImplementation();
             }
         } catch (Exception e) {
             logger.warn("Unable to create ControllerService of type {} due to 
{}; its default properties will be fingerprinted instead of being ignored.", 
dto.getType(), e.toString());
@@ -596,10 +608,37 @@ public class FingerprintFactory {
         }
     }
 
+    private void addBundleFingerprint(final StringBuilder builder, final 
BundleDTO bundle) {
+        if (bundle != null) {
+            builder.append(bundle.getGroup());
+            builder.append(bundle.getArtifact());
+            builder.append(bundle.getVersion());
+        } else {
+            builder.append("MISSING_BUNDLE");
+        }
+    }
+
+    private BundleCoordinate getCoordinate(final String type, final BundleDTO 
dto) {
+        BundleCoordinate coordinate;
+        try {
+            coordinate = BundleUtils.getCompatibleBundle(type, dto);
+        } catch (final IllegalStateException e) {
+            if (dto == null) {
+                coordinate = BundleCoordinate.UNKNOWN_COORDINATE;
+            } else {
+                coordinate = new BundleCoordinate(dto.getGroup(), 
dto.getArtifact(), dto.getVersion());
+            }
+        }
+        return coordinate;
+    }
+
     private void addReportingTaskFingerprint(final StringBuilder builder, 
final ReportingTaskDTO dto, final FlowController controller) {
         builder.append(dto.getId());
         builder.append(dto.getType());
         builder.append(dto.getName());
+
+        addBundleFingerprint(builder, dto.getBundle());
+
         builder.append(dto.getComments());
         builder.append(dto.getSchedulingPeriod());
         builder.append(dto.getSchedulingStrategy());
@@ -609,7 +648,8 @@ public class FingerprintFactory {
         ReportingTask reportingTask = null;
         try {
             if (controller != null) {
-                reportingTask = controller.createReportingTask(dto.getType(), 
UUID.randomUUID().toString(), false, false).getReportingTask();
+                final BundleCoordinate coordinate = 
getCoordinate(dto.getType(), dto.getBundle());
+                reportingTask = controller.createReportingTask(dto.getType(), 
UUID.randomUUID().toString(), coordinate, false, false).getReportingTask();
             }
         } catch (Exception e) {
             logger.warn("Unable to create ReportingTask of type {} due to {}; 
its default properties will be fingerprinted instead of being ignored.", 
dto.getType(), e.toString());

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
index 08658b1..0565eb7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
 
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.MissingBundleException;
 import org.apache.nifi.controller.UninheritableFlowException;
 import org.apache.nifi.controller.serialization.FlowSerializationException;
 import org.apache.nifi.controller.serialization.FlowSynchronizationException;
@@ -49,9 +50,10 @@ public interface FlowConfigurationDAO {
      * @throws FlowSerializationException if proposed flow is not a valid flow 
configuration file
      * @throws UninheritableFlowException if the proposed flow cannot be 
loaded by the controller because in doing so would risk orphaning flow files
      * @throws FlowSynchronizationException if updates to the controller 
failed. If this exception is thrown, then the controller should be considered 
unsafe to be used
+     * @throws MissingBundleException if the proposed flow cannot be loaded by 
the controller because it contains a bundle that does not exist in the 
controller
      */
     void load(FlowController controller, DataFlow dataFlow)
-            throws IOException, FlowSerializationException, 
FlowSynchronizationException, UninheritableFlowException;
+            throws IOException, FlowSerializationException, 
FlowSynchronizationException, UninheritableFlowException, 
MissingBundleException;
 
     /**
      * Loads the stored flow onto the given stream.

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index 765bf6f..26c5224 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -28,6 +28,7 @@ import java.util.zip.GZIPOutputStream;
 
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.MissingBundleException;
 import org.apache.nifi.controller.StandardFlowSynchronizer;
 import org.apache.nifi.controller.UninheritableFlowException;
 import org.apache.nifi.controller.serialization.FlowSerializationException;
@@ -77,7 +78,7 @@ public final class StandardXMLFlowConfigurationDAO implements 
FlowConfigurationD
 
     @Override
     public synchronized void load(final FlowController controller, final 
DataFlow dataFlow)
-            throws IOException, FlowSerializationException, 
FlowSynchronizationException, UninheritableFlowException {
+            throws IOException, FlowSerializationException, 
FlowSynchronizationException, UninheritableFlowException, 
MissingBundleException {
 
         final FlowSynchronizer flowSynchronizer = new 
StandardFlowSynchronizer(encryptor, nifiProperties);
         controller.synchronize(flowSynchronizer, dataFlow);

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java
index 8abd5cd..c8d2549 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java
@@ -79,6 +79,7 @@ public class GhostProcessor implements Processor {
 
     @Override
     public void initialize(final ProcessorInitializationContext context) {
+
     }
 
     @Override
@@ -96,4 +97,5 @@ public class GhostProcessor implements Processor {
     public String toString() {
         return "GhostProcessor[id=" + id + "]";
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java
index 167bc7a..71a8236 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java
@@ -24,12 +24,14 @@ import java.util.List;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
 
 public class GhostReportingTask implements ReportingTask {
 
     private String id;
     private String canonicalClassName;
+    private ComponentLog logger;
 
     public void setIdentifier(final String id) {
         this.id = id;
@@ -84,6 +86,7 @@ public class GhostReportingTask implements ReportingTask {
 
     @Override
     public void initialize(ReportingInitializationContext config) throws 
InitializationException {
+        this.logger = config.getLogger();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java
new file mode 100644
index 0000000..eda045a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.web.api.dto.BundleDTO;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for Bundles.
+ */
+public final class BundleUtils {
+
+    private static BundleCoordinate findBundleForType(final String type, final 
BundleCoordinate desiredCoordinate) {
+        final List<Bundle> bundles = ExtensionManager.getBundles(type);
+        if (bundles.isEmpty()) {
+            throw new IllegalStateException(String.format("%s is not known to 
this NiFi instance.", type));
+        } else if (bundles.size() > 1) {
+            if (desiredCoordinate == null) {
+                throw new IllegalStateException(String.format("Multiple 
versions of %s exist.", type));
+            } else {
+                throw new IllegalStateException(String.format("Multiple 
versions of %s exist. No exact match for %s.", type, desiredCoordinate));
+            }
+        } else {
+            return bundles.get(0).getBundleDetails().getCoordinate();
+        }
+    }
+
+    private static BundleCoordinate findCompatibleBundle(final String type, 
final BundleDTO bundleDTO, final boolean allowCompatibleBundle) {
+        final BundleCoordinate coordinate = new 
BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(), 
bundleDTO.getVersion());
+        final Bundle bundle = ExtensionManager.getBundle(coordinate);
+
+        if (bundle == null) {
+            if (allowCompatibleBundle) {
+                return findBundleForType(type, coordinate);
+            } else {
+                throw new IllegalStateException(String.format("%s from %s is 
not known to this NiFi instance.", type, coordinate));
+            }
+        } else {
+            final List<BundleCoordinate> bundlesForType = 
ExtensionManager.getBundles(type).stream().map(b -> 
b.getBundleDetails().getCoordinate()).collect(Collectors.toList());
+            if (bundlesForType.contains(coordinate)) {
+                return coordinate;
+            } else {
+                throw new IllegalStateException(String.format("Found bundle %s 
but does not support %s", coordinate, type));
+            }
+        }
+    }
+
+    /**
+     * Gets a bundle that supports the specified type. If the bundle is 
specified, an
+     * exact match must be available.
+     *
+     *  <ul>
+     *      <li>If bundleDTO is specified</li>
+     *      <ul>
+     *          <li>Matching bundle found</li>
+     *          <ul>
+     *              <li>If bundle supports type, use it</li>
+     *              <li>If bundle doesn't support type, throw 
IllegalStateException</li>
+     *          </ul>
+     *          <li>No matching bundle found, IllegalStateException</li>
+     *      </ul>
+     *      <li>If bundleDTO is not specified</li>
+     *      <ul>
+     *          <li>One bundle that supports the specified type, use it</li>
+     *          <li>No bundle that supports the specified type, 
IllegalStateException</li>
+     *          <li>Multiple bundle that supports the specified type, 
IllegalStateException</li>
+     *      </ul>
+     *  </ul>
+     *
+     * @param type the component type
+     * @param bundleDTO bundle to find the component
+     * @return the bundle coordinate
+     * @throws IllegalStateException bundle not found
+     */
+    public static BundleCoordinate getBundle(final String type, final 
BundleDTO bundleDTO) {
+        if (bundleDTO == null) {
+            return findBundleForType(type, null);
+        } else {
+            return findCompatibleBundle(type, bundleDTO, false);
+        }
+    }
+
+    /**
+     * Gets a compatible bundle that supports the specified type. If the 
bundle is
+     * specified but is not available, a compatible bundle may be returned if 
there
+     * is only one.
+     *
+     *  <ul>
+     *      <li>If bundleDTO is specified</li>
+     *      <ul>
+     *          <li>Matching bundle found</li>
+     *          <ul>
+     *              <li>If bundle supports type, use it</li>
+     *              <li>If bundle doesn't support type, throw 
IllegalStateException</li>
+     *          </ul>
+     *          <li>No matching bundle found</li>
+     *          <ul>
+     *              <li>One bundle that supports the specified type, use 
it</li>
+     *              <li>No bundle that supports the specified type, 
IllegalStateException</li>
+     *              <li>Multiple bundle that supports the specified type, 
IllegalStateException</li>
+     *          </ul>
+     *      </ul>
+     *      <li>If bundleDTO is not specified</li>
+     *      <ul>
+     *          <li>One bundle that supports the specified type, use it</li>
+     *          <li>No bundle that supports the specified type, 
IllegalStateException</li>
+     *          <li>Multiple bundle that supports the specified type, 
IllegalStateException</li>
+     *      </ul>
+     *  </ul>
+     *
+     * @param type the component type
+     * @param bundleDTO bundle to find the component
+     * @return the bundle coordinate
+     * @throws IllegalStateException no compatible bundle found
+     */
+    public static BundleCoordinate getCompatibleBundle(final String type, 
final BundleDTO bundleDTO) {
+        if (bundleDTO == null) {
+            return findBundleForType(type, null);
+        } else {
+            return findCompatibleBundle(type, bundleDTO, true);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
index 02a9ca5..e21b991 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
@@ -59,6 +59,7 @@
 
             <!-- "class" is the actual Java class that performs the type of 
processing desired-->
             <xs:element name="class" type="NonEmptyStringType"/>
+            <xs:element name="bundle" type="BundleType" />
 
             <!-- the number of concurrent tasks for this configured
             processor that can be executed at any one time. This value can be 
0 
@@ -283,6 +284,14 @@
         </xs:complexContent>
     </xs:complexType>
 
+    <xs:complexType name="BundleType">
+        <xs:sequence>
+            <xs:element name="group" type="NonEmptyStringType" />
+            <xs:element name="artifact" type="NonEmptyStringType" />
+            <xs:element name="version" type="NonEmptyStringType" />
+        </xs:sequence>
+    </xs:complexType>
+
     <xs:complexType name="PositionType">
         <xs:attribute name="x" type="xs:double" use="required" />
         <xs:attribute name="y" type="xs:double" use="required" />
@@ -365,6 +374,7 @@
             <xs:element name="name" type="NonEmptyStringType" />
             <xs:element name="comment" type="xs:string" />
             <xs:element name="class" type="NonEmptyStringType" />
+            <xs:element name="bundle" type="BundleType" />
             <xs:element name="enabled" type="xs:boolean" />
                
             <xs:element name="property" type="PropertyType" minOccurs="0" 
maxOccurs="unbounded"/>
@@ -384,6 +394,7 @@
             <xs:element name="name" type="NonEmptyStringType" />
             <xs:element name="comment" type="xs:string" />
             <xs:element name="class" type="NonEmptyStringType" />
+            <xs:element name="bundle" type="BundleType" />
             <xs:element name="schedulingPeriod" type="NonEmptyStringType"/>
             <xs:element name="scheduledState" type="ScheduledState" />
             <xs:element name="schedulingStrategy" type="SchedulingStrategy" />

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
index 28b314c..738b25d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
@@ -18,27 +18,37 @@ package org.apache.nifi.controller
 
 import groovy.xml.XmlUtil
 import org.apache.nifi.authorization.Authorizer
+import org.apache.nifi.bundle.BundleCoordinate
 import org.apache.nifi.cluster.protocol.DataFlow
 import org.apache.nifi.connectable.*
 import org.apache.nifi.controller.label.Label
 import org.apache.nifi.controller.queue.FlowFileQueue
 import org.apache.nifi.groups.ProcessGroup
 import org.apache.nifi.groups.RemoteProcessGroup
+import org.apache.nifi.nar.ExtensionManager
 import org.apache.nifi.processor.Relationship
 import org.apache.nifi.reporting.BulletinRepository
 import org.apache.nifi.util.NiFiProperties
+import spock.lang.Shared
 import spock.lang.Specification
 import spock.lang.Unroll
 
 class StandardFlowSynchronizerSpec extends Specification {
-    
+
+    @Shared
+    def systemBundle;
+
     def setupSpec() {
         def propFile = 
StandardFlowSynchronizerSpec.class.getResource("/nifi.properties").getFile()
         System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, propFile
+
+        def niFiProperties = NiFiProperties.createBasicNiFiProperties(null, 
null);
+        systemBundle = ExtensionManager.createSystemBundle(niFiProperties);
+        ExtensionManager.discoverExtensions(systemBundle, 
Collections.emptySet());
     }
 
     def teardownSpec() {
-        System.clearProperty NiFiProperties.PROPERTIES_FILE_PATH
+
     }
 
     @Unroll
@@ -67,18 +77,26 @@ class StandardFlowSynchronizerSpec extends Specification {
         // the unit under test
         def nifiProperties = NiFiProperties.createBasicNiFiProperties(null, 
null)
         def flowSynchronizer = new 
StandardFlowSynchronizer(null,nifiProperties)
+        def firstRootGroup = Mock ProcessGroup
 
         when: "the flow is synchronized with the current state of the 
controller"
         flowSynchronizer.sync controller, proposedFlow, null
 
         then: "establish interactions for the mocked collaborators of 
StandardFlowSynchronizer to store the ending positions of components"
-        1 * controller.isInitialized() >> false
+        1 * firstRootGroup.findAllProcessors() >> []
+        1 * controller.isFlowSynchronized() >> false
         _ * controller.rootGroupId >> flowControllerXml.rootGroup.id.text()
         _ * controller.getGroup(_) >> { String id -> 
positionableMocksById.get(id) }
         _ * controller.snippetManager >> snippetManager
         _ * controller.bulletinRepository >> bulletinRepository
         _ * controller.authorizer >> authorizer
         _ * controller./set.*/(*_)
+        _ * controller.getAllControllerServices() >> []
+        _ * controller.getAllReportingTasks() >> []
+        _ * controller.getRootGroup() >>> [
+            firstRootGroup,
+            positionableMocksById.get(controller.rootGroupId)
+        ]
         _ * controller.createProcessGroup(_) >> { String pgId ->
             def processGroup = Mock(ProcessGroup)
             _ * processGroup.getIdentifier() >> pgId
@@ -112,7 +130,7 @@ class StandardFlowSynchronizerSpec extends Specification {
             return processGroup
         }
 
-        _ * controller.createProcessor(_, _, _) >> { String type, String id, 
boolean firstTimeAdded ->
+        _ * controller.createProcessor(_, _, _, _) >> { String type, String 
id, BundleCoordinate coordinate, boolean firstTimeAdded ->
             def processor = Mock(ProcessorNode)
             _ * processor.getPosition() >> { positionablePositionsById.get(id) 
}
             _ * processor.setPosition(_) >> { Position pos ->
@@ -120,6 +138,7 @@ class StandardFlowSynchronizerSpec extends Specification {
             }
             _ * processor./(add|set).*/(*_)
             _ * processor.getIdentifier() >> id
+            _ * processor.getBundleCoordinate() >> coordinate
             _ * processor.getRelationship(_) >> { String n -> new 
Relationship.Builder().name(n).build() }
             positionableMocksById.put(id, processor)
             return processor
@@ -197,6 +216,7 @@ class StandardFlowSynchronizerSpec extends Specification {
             [] as byte[]
         }
         _ * proposedFlow.authorizerFingerprint >> null
+        _ * proposedFlow.missingComponents >> []
 
         _ * flowFileQueue./set.*/(*_)
         _ * _.hashCode() >> 1

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
index 79b59d7..0bc216f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
@@ -27,6 +27,7 @@ import 
org.apache.nifi.controller.serialization.StandardFlowSerializer;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.events.VolatileBulletinRepository;
 import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.util.FileBasedVariableRegistry;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
@@ -46,8 +47,8 @@ import org.junit.Test;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
-import org.apache.nifi.util.FileBasedVariableRegistry;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -75,6 +76,9 @@ public class StandardFlowServiceTest {
     @Before
     public void setup() throws Exception {
         properties = NiFiProperties.createBasicNiFiProperties(null, null);
+
+
+
         variableRegistry = new 
FileBasedVariableRegistry(properties.getVariableRegistryPropertiesPaths());
         mockFlowFileEventRepository = mock(FlowFileEventRepository.class);
         authorizer = mock(Authorizer.class);
@@ -88,7 +92,7 @@ public class StandardFlowServiceTest {
     @Test
     public void testLoadWithFlow() throws IOException {
         byte[] flowBytes = 
IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
-        flowService.load(new StandardDataFlow(flowBytes, null, null));
+        flowService.load(new StandardDataFlow(flowBytes, null, null, new 
HashSet<>()));
 
         FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -103,16 +107,16 @@ public class StandardFlowServiceTest {
     @Test(expected = FlowSerializationException.class)
     public void testLoadWithCorruptFlow() throws IOException {
         byte[] flowBytes = 
IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-corrupt.xml"));
-        flowService.load(new StandardDataFlow(flowBytes, null, null));
+        flowService.load(new StandardDataFlow(flowBytes, null, null, new 
HashSet<>()));
     }
 
     @Test
     public void testLoadExistingFlow() throws IOException {
         byte[] flowBytes = 
IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
-        flowService.load(new StandardDataFlow(flowBytes, null, null));
+        flowService.load(new StandardDataFlow(flowBytes, null, null, new 
HashSet<>()));
 
         flowBytes = 
IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-inheritable.xml"));
-        flowService.load(new StandardDataFlow(flowBytes, null, null));
+        flowService.load(new StandardDataFlow(flowBytes, null, null, new 
HashSet<>()));
 
         FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -126,11 +130,11 @@ public class StandardFlowServiceTest {
     @Test
     public void testLoadExistingFlowWithUninheritableFlow() throws IOException 
{
         byte[] originalBytes = 
IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
-        flowService.load(new StandardDataFlow(originalBytes, null, null));
+        flowService.load(new StandardDataFlow(originalBytes, null, null, new 
HashSet<>()));
 
         try {
             byte[] updatedBytes = 
IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-uninheritable.xml"));
-            flowService.load(new StandardDataFlow(updatedBytes, null, null));
+            flowService.load(new StandardDataFlow(updatedBytes, null, null, 
new HashSet<>()));
             fail("should have thrown " + UninheritableFlowException.class);
         } catch (UninheritableFlowException ufe) {
 
@@ -148,11 +152,11 @@ public class StandardFlowServiceTest {
     @Test
     public void testLoadExistingFlowWithCorruptFlow() throws IOException {
         byte[] originalBytes = 
IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml"));
-        flowService.load(new StandardDataFlow(originalBytes, null, null));
+        flowService.load(new StandardDataFlow(originalBytes, null, null, new 
HashSet<>()));
 
         try {
             byte[] updatedBytes = 
IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-corrupt.xml"));
-            flowService.load(new StandardDataFlow(updatedBytes, null, null));
+            flowService.load(new StandardDataFlow(updatedBytes, null, null, 
new HashSet<>()));
             fail("should have thrown " + FlowSerializationException.class);
         } catch (FlowSerializationException ufe) {
 

Reply via email to