http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 191fc65..f312096 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -35,6 +35,8 @@ import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.DataAuthorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
@@ -63,6 +65,7 @@ import 
org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
 import org.apache.nifi.controller.cluster.Heartbeater;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import 
org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
@@ -105,6 +108,7 @@ import 
org.apache.nifi.controller.serialization.FlowSerializationException;
 import org.apache.nifi.controller.serialization.FlowSerializer;
 import org.apache.nifi.controller.serialization.FlowSynchronizationException;
 import org.apache.nifi.controller.serialization.FlowSynchronizer;
+import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
@@ -150,6 +154,7 @@ import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.processor.StandardProcessContext;
 import org.apache.nifi.processor.StandardProcessorInitializationContext;
 import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.provenance.IdentifierLookup;
@@ -184,11 +189,13 @@ import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
@@ -218,6 +225,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -279,6 +287,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final Set<RemoteSiteListener> externalSiteListeners = new 
HashSet<>();
     private final AtomicReference<CounterRepository> counterRepositoryRef;
     private final AtomicBoolean initialized = new AtomicBoolean(false);
+    private final AtomicBoolean flowSynchronized = new AtomicBoolean(false);
     private final StandardControllerServiceProvider controllerServiceProvider;
     private final Authorizer authorizer;
     private final AuditService auditService;
@@ -1012,13 +1021,14 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      *
      * @param type processor type
      * @param id processor id
+     * @param coordinate the coordinate of the bundle for this processor
      * @return new processor
      * @throws NullPointerException if either arg is null
      * @throws ProcessorInstantiationException if the processor cannot be
      * instantiated for any reason
      */
-    public ProcessorNode createProcessor(final String type, final String id) 
throws ProcessorInstantiationException {
-        return createProcessor(type, id, true);
+    public ProcessorNode createProcessor(final String type, final String id, 
final BundleCoordinate coordinate) throws ProcessorInstantiationException {
+        return createProcessor(type, id, coordinate, true);
     }
 
     /**
@@ -1029,6 +1039,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      *
      * @param type the fully qualified Processor class name
      * @param id the unique ID of the Processor
+     * @param coordinate the bundle coordinate for this processor
      * @param firstTimeAdded whether or not this is the first time this
      * Processor is added to the graph. If {@code true}, will invoke methods
      * annotated with the {@link OnAdded} annotation.
@@ -1037,39 +1048,63 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      * @throws ProcessorInstantiationException if the processor cannot be
      * instantiated for any reason
      */
-    public ProcessorNode createProcessor(final String type, String id, final 
boolean firstTimeAdded) throws ProcessorInstantiationException {
+    public ProcessorNode createProcessor(final String type, String id, final 
BundleCoordinate coordinate, final boolean firstTimeAdded) throws 
ProcessorInstantiationException {
+        return createProcessor(type, id, coordinate, firstTimeAdded, true);
+    }
+
+    /**
+     * <p>
+     * Creates a new ProcessorNode with the given type and identifier and
+     * optionally initializes it.
+     * </p>
+     *
+     * @param type the fully qualified Processor class name
+     * @param id the unique ID of the Processor
+     * @param coordinate the bundle coordinate for this processor
+     * @param firstTimeAdded whether or not this is the first time this
+     * Processor is added to the graph. If {@code true}, will invoke methods
+     * annotated with the {@link OnAdded} annotation.
+     * @return new processor node
+     * @throws NullPointerException if either arg is null
+     * @throws ProcessorInstantiationException if the processor cannot be
+     * instantiated for any reason
+     */
+    public ProcessorNode createProcessor(final String type, String id, final 
BundleCoordinate coordinate, final boolean firstTimeAdded, final boolean 
registerLogObserver)
+            throws ProcessorInstantiationException {
         id = id.intern();
 
         boolean creationSuccessful;
-        Processor processor;
+        LoggableComponent<Processor> processor;
         try {
-            processor = instantiateProcessor(type, id);
+            processor = instantiateProcessor(type, id, coordinate);
             creationSuccessful = true;
         } catch (final ProcessorInstantiationException pie) {
             LOG.error("Could not create Processor of type " + type + " for ID 
" + id + "; creating \"Ghost\" implementation", pie);
             final GhostProcessor ghostProc = new GhostProcessor();
             ghostProc.setIdentifier(id);
             ghostProc.setCanonicalClassName(type);
-            processor = ghostProc;
+            processor = new LoggableComponent<>(ghostProc, coordinate, null);
             creationSuccessful = false;
         }
 
-        final ComponentLog logger = new SimpleProcessLogger(id, processor);
         final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(controllerServiceProvider, variableRegistry);
         final ProcessorNode procNode;
         if (creationSuccessful) {
-            procNode = new StandardProcessorNode(processor, id, 
validationContextFactory, processScheduler, controllerServiceProvider, 
nifiProperties, variableRegistry, logger);
+            procNode = new StandardProcessorNode(processor, id, 
validationContextFactory, processScheduler, controllerServiceProvider, 
nifiProperties, variableRegistry);
         } else {
             final String simpleClassName = type.contains(".") ? 
StringUtils.substringAfterLast(type, ".") : type;
             final String componentType = "(Missing) " + simpleClassName;
-            procNode = new StandardProcessorNode(processor, id, 
validationContextFactory, processScheduler, controllerServiceProvider, 
componentType, type, nifiProperties, variableRegistry, logger);
+            procNode = new StandardProcessorNode(
+                    processor, id, validationContextFactory, processScheduler, 
controllerServiceProvider, componentType, type, nifiProperties, 
variableRegistry, true);
         }
 
         final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
-        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, 
LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
+        if (registerLogObserver) {
+            
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, 
LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
+        }
 
         try {
-            final Class<?> procClass = processor.getClass();
+            final Class<?> procClass = procNode.getProcessor().getClass();
             if(procClass.isAnnotationPresent(DefaultSettings.class)) {
                 DefaultSettings ds = 
procClass.getAnnotation(DefaultSettings.class);
                 try {
@@ -1083,27 +1118,33 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 } catch(Throwable ex) {
                     LOG.error(String.format("Error while setting penalty 
duration from DefaultSettings annotation:%s",ex.getMessage()),ex);
                 }
-                try {
-                    procNode.setBulletinLevel(ds.bulletinLevel());
-                } catch (Throwable ex) {
-                    LOG.error(String.format("Error while setting bulletin 
level from DefaultSettings annotation:%s",ex.getMessage()),ex);
-                }
 
+                // calling setBulletinLevel changes the level in the 
LogRepository so we only want to do this when
+                // the caller said to register the log observer, otherwise we 
could be changing the level when we didn't mean to
+                if (registerLogObserver) {
+                    try {
+                        procNode.setBulletinLevel(ds.bulletinLevel());
+                    } catch (Throwable ex) {
+                        LOG.error(String.format("Error while setting bulletin 
level from DefaultSettings annotation:%s", ex.getMessage()), ex);
+                    }
+                }
             }
         } catch (Throwable ex) {
             LOG.error(String.format("Error while setting default settings from 
DefaultSettings annotation: %s",ex.getMessage()),ex);
         }
 
         if (firstTimeAdded) {
-            try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
-                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
processor);
+            try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), 
procNode.getProcessor().getIdentifier())) {
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
procNode.getProcessor());
             } catch (final Exception e) {
-                
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+                if (registerLogObserver) {
+                    
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+                }
                 throw new ComponentLifeCycleException("Failed to invoke 
@OnAdded methods of " + procNode.getProcessor(), e);
             }
 
             if (firstTimeAdded) {
-                try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), 
processor.getIdentifier())) {
+                try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), 
procNode.getProcessor().getIdentifier())) {
                     
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
 procNode.getProcessor());
                 }
             }
@@ -1112,30 +1153,28 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return procNode;
     }
 
-    private Processor instantiateProcessor(final String type, final String 
identifier) throws ProcessorInstantiationException {
-        Processor processor;
+    private LoggableComponent<Processor> instantiateProcessor(final String 
type, final String identifier, final BundleCoordinate bundleCoordinate) throws 
ProcessorInstantiationException {
+        final Bundle processorBundle = 
ExtensionManager.getBundle(bundleCoordinate);
+        if (processorBundle == null) {
+            throw new ProcessorInstantiationException("Unable to find bundle 
for coordinate " + bundleCoordinate.getCoordinate());
+        }
 
         final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
-            final ClassLoader detectedClassLoaderForType = 
ExtensionManager.getClassLoader(type, identifier);
-            final Class<?> rawClass;
-            if (detectedClassLoaderForType == null) {
-                // try to find from the current class loader
-                rawClass = Class.forName(type);
-            } else {
-                // try to find from the registered classloader for that type
-                rawClass = Class.forName(type, true, 
ExtensionManager.getClassLoader(type, identifier));
-            }
-
+            final ClassLoader detectedClassLoaderForType = 
ExtensionManager.createInstanceClassLoader(type, identifier, processorBundle);
+            final Class<?> rawClass = Class.forName(type, true, 
processorBundle.getClassLoader());
             
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+
             final Class<? extends Processor> processorClass = 
rawClass.asSubclass(Processor.class);
-            processor = processorClass.newInstance();
+            final Processor processor = processorClass.newInstance();
+
             final ComponentLog componentLogger = new 
SimpleProcessLogger(identifier, processor);
             final ProcessorInitializationContext ctx = new 
StandardProcessorInitializationContext(identifier, componentLogger, this, this, 
nifiProperties);
             processor.initialize(ctx);
 
             
LogRepositoryFactory.getRepository(identifier).setLogger(componentLogger);
-            return processor;
+
+            return new LoggableComponent<>(processor, bundleCoordinate, 
componentLogger);
         } catch (final Throwable t) {
             throw new ProcessorInstantiationException(type, t);
         } finally {
@@ -1145,6 +1184,34 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
+    public void changeProcessorType(final ProcessorNode existingNode, final 
String newType, final BundleCoordinate bundleCoordinate) throws 
ProcessorInstantiationException {
+        if (existingNode == null) {
+            throw new IllegalStateException("Existing ProcessorNode cannot be 
null");
+        }
+
+        final String id = existingNode.getProcessor().getIdentifier();
+
+        // createProcessor will create a new instance class loader for the 
same id so
+        // save the instance class loader to use it for calling OnRemoved on 
the existing processor
+        final ClassLoader existingInstanceClassLoader = 
ExtensionManager.getInstanceClassLoader(id);
+
+        // create a new node with firstTimeAdded as true so lifecycle methods 
get fired
+        // attempt the creation to make sure it works before firing the 
OnRemoved methods below
+        final ProcessorNode newNode = createProcessor(newType, id, 
bundleCoordinate, true, false);
+
+        // call OnRemoved for the existing processor using the previous 
instance class loader
+        try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+            final StandardProcessContext processContext = new 
StandardProcessContext(
+                    existingNode, controllerServiceProvider, encryptor, 
getStateManagerProvider().getStateManager(id), variableRegistry);
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, 
existingNode.getProcessor(), processContext);
+        }
+
+        // set the new processor in the existing node
+        final LoggableComponent<Processor> newProcessor = new 
LoggableComponent<>(newNode.getProcessor(), newNode.getBundleCoordinate(), 
newNode.getLogger());
+        existingNode.setProcessor(newProcessor);
+        existingNode.setExtensionMissing(newNode.isExtensionMissing());
+    }
+
     /**
      * @return the ExtensionManager used for instantiating Processors,
      * Prioritizers, etc.
@@ -1459,13 +1526,16 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      * @throws FlowSynchronizationException if updates to the controller 
failed.
      * If this exception is thrown, then the controller should be considered
      * unsafe to be used
+     * @throws MissingBundleException if the proposed flow cannot be loaded by 
the
+     * controller because it contains a bundle that does not exist in the 
controller
      */
     public void synchronize(final FlowSynchronizer synchronizer, final 
DataFlow dataFlow)
-            throws FlowSerializationException, FlowSynchronizationException, 
UninheritableFlowException {
+            throws FlowSerializationException, FlowSynchronizationException, 
UninheritableFlowException, MissingBundleException {
         writeLock.lock();
         try {
             LOG.debug("Synchronizing controller with proposed flow");
             synchronizer.sync(this, dataFlow, encryptor);
+            flowSynchronized.set(true);
             LOG.info("Successfully synchronized controller with proposed 
flow");
         } finally {
             writeLock.unlock();
@@ -1630,7 +1700,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // Instantiate Controller Services
             //
             for (final ControllerServiceDTO controllerServiceDTO : 
dto.getControllerServices()) {
-                final ControllerServiceNode serviceNode = 
createControllerService(controllerServiceDTO.getType(), 
controllerServiceDTO.getId(), true);
+                final BundleCoordinate bundleCoordinate = 
BundleUtils.getBundle(controllerServiceDTO.getType(), 
controllerServiceDTO.getBundle());
+                final ControllerServiceNode serviceNode = 
createControllerService(controllerServiceDTO.getType(), 
controllerServiceDTO.getId(), bundleCoordinate, true);
 
                 
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
                 serviceNode.setComments(controllerServiceDTO.getComments());
@@ -1717,7 +1788,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // Instantiate the processors
             //
             for (final ProcessorDTO processorDTO : dto.getProcessors()) {
-                final ProcessorNode procNode = 
createProcessor(processorDTO.getType(), processorDTO.getId());
+                final BundleCoordinate bundleCoordinate = 
BundleUtils.getBundle(processorDTO.getType(), processorDTO.getBundle());
+                final ProcessorNode procNode = 
createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
 
                 procNode.setPosition(toPosition(processorDTO.getPosition()));
                 procNode.setProcessGroup(group);
@@ -1953,45 +2025,83 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
+    private void verifyBundleInSnippet(final BundleDTO requiredBundle, final 
Set<BundleCoordinate> supportedBundles) {
+        final BundleCoordinate requiredCoordinate = new 
BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), 
requiredBundle.getVersion());
+        if (!supportedBundles.contains(requiredCoordinate)) {
+            throw new IllegalStateException("Unsupported bundle: " + 
requiredCoordinate);
+        }
+    }
+
+    private void verifyProcessorsInSnippet(final FlowSnippetDTO 
templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+        if (templateContents.getProcessors() != null) {
+            templateContents.getProcessors().forEach(processor -> {
+                if (processor.getBundle() == null) {
+                    throw new IllegalArgumentException("Processor bundle must 
be specified.");
+                }
+
+                if (supportedTypes.containsKey(processor.getType())) {
+                    verifyBundleInSnippet(processor.getBundle(), 
supportedTypes.get(processor.getType()));
+                } else {
+                    throw new IllegalStateException("Invalid Processor Type: " 
+ processor.getType());
+                }
+            });
+        }
+
+        if (templateContents.getProcessGroups() != null) {
+            templateContents.getProcessGroups().forEach(processGroup -> {
+                verifyProcessorsInSnippet(processGroup.getContents(), 
supportedTypes);
+            });
+        }
+    }
+
+    private void verifyControllerServicesInSnippet(final FlowSnippetDTO 
templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+        if (templateContents.getControllerServices() != null) {
+            templateContents.getControllerServices().forEach(controllerService 
-> {
+                if (supportedTypes.containsKey(controllerService.getType())) {
+                    if (controllerService.getBundle() == null) {
+                        throw new IllegalArgumentException("Controller Service 
bundle must be specified.");
+                    }
+
+                    verifyBundleInSnippet(controllerService.getBundle(), 
supportedTypes.get(controllerService.getType()));
+                } else {
+                    throw new IllegalStateException("Invalid Controller 
Service Type: " + controllerService.getType());
+                }
+            });
+        }
+
+        if (templateContents.getProcessGroups() != null) {
+            templateContents.getProcessGroups().forEach(processGroup -> {
+                verifyControllerServicesInSnippet(processGroup.getContents(), 
supportedTypes);
+            });
+        }
+    }
+
     public void verifyComponentTypesInSnippet(final FlowSnippetDTO 
templateContents) {
-        // validate that all Processor Types and Prioritizer Types are valid
-        final Set<String> processorClasses = new HashSet<>();
+        final Map<String, Set<BundleCoordinate>> processorClasses = new 
HashMap<>();
         for (final Class<?> c : 
ExtensionManager.getExtensions(Processor.class)) {
-            processorClasses.add(c.getName());
+            final String name = c.getName();
+            processorClasses.put(name, 
ExtensionManager.getBundles(name).stream().map(bundle -> 
bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
         }
+        verifyProcessorsInSnippet(templateContents, processorClasses);
+
+        final Map<String, Set<BundleCoordinate>> controllerServiceClasses = 
new HashMap<>();
+        for (final Class<?> c : 
ExtensionManager.getExtensions(ControllerService.class)) {
+            final String name = c.getName();
+            controllerServiceClasses.put(name, 
ExtensionManager.getBundles(name).stream().map(bundle -> 
bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+        }
+        verifyControllerServicesInSnippet(templateContents, 
controllerServiceClasses);
+
         final Set<String> prioritizerClasses = new HashSet<>();
         for (final Class<?> c : 
ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
             prioritizerClasses.add(c.getName());
         }
-        final Set<String> controllerServiceClasses = new HashSet<>();
-        for (final Class<?> c : 
ExtensionManager.getExtensions(ControllerService.class)) {
-            controllerServiceClasses.add(c.getName());
-        }
 
-        final Set<ProcessorDTO> allProcs = new HashSet<>();
         final Set<ConnectionDTO> allConns = new HashSet<>();
-        allProcs.addAll(templateContents.getProcessors());
         allConns.addAll(templateContents.getConnections());
         for (final ProcessGroupDTO childGroup : 
templateContents.getProcessGroups()) {
-            allProcs.addAll(findAllProcessors(childGroup));
             allConns.addAll(findAllConnections(childGroup));
         }
 
-        for (final ProcessorDTO proc : allProcs) {
-            if (!processorClasses.contains(proc.getType())) {
-                throw new IllegalStateException("Invalid Processor Type: " + 
proc.getType());
-            }
-        }
-
-        final Set<ControllerServiceDTO> controllerServices = 
templateContents.getControllerServices();
-        if (controllerServices != null) {
-            for (final ControllerServiceDTO service : controllerServices) {
-                if (!controllerServiceClasses.contains(service.getType())) {
-                    throw new IllegalStateException("Invalid Controller 
Service Type: " + service.getType());
-                }
-            }
-        }
-
         for (final ConnectionDTO conn : allConns) {
             final List<String> prioritizers = conn.getPrioritizers();
             if (prioritizers != null) {
@@ -2047,24 +2157,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     /**
-     * Recursively finds all ProcessorDTO's
-     *
-     * @param group group
-     * @return processor dto set
-     */
-    private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) {
-        final Set<ProcessorDTO> procs = new HashSet<>();
-        for (final ProcessorDTO dto : group.getContents().getProcessors()) {
-            procs.add(dto);
-        }
-
-        for (final ProcessGroupDTO childGroup : 
group.getContents().getProcessGroups()) {
-            procs.addAll(findAllProcessors(childGroup));
-        }
-        return procs;
-    }
-
-    /**
      * Recursively finds all ConnectionDTO's
      *
      * @param group group
@@ -2110,16 +2202,18 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
-            final ClassLoader detectedClassLoaderForType = 
ExtensionManager.getClassLoader(type);
-            final Class<?> rawClass;
-            if (detectedClassLoaderForType == null) {
-                // try to find from the current class loader
-                rawClass = Class.forName(type);
-            } else {
-                // try to find from the registered classloader for that type
-                rawClass = Class.forName(type, true, 
ExtensionManager.getClassLoader(type));
+            final List<Bundle> prioritizerBundles = 
ExtensionManager.getBundles(type);
+            if (prioritizerBundles.size() == 0) {
+                throw new IllegalStateException(String.format("The specified 
class '%s' is not known to this nifi.", type));
+            }
+            if (prioritizerBundles.size() > 1) {
+                throw new IllegalStateException(String.format("Multiple 
bundles found for the specified class '%s', only one is allowed.", type));
             }
 
+            final Bundle bundle = prioritizerBundles.get(0);
+            final ClassLoader detectedClassLoaderForType = 
bundle.getClassLoader();
+            final Class<?> rawClass = Class.forName(type, true, 
detectedClassLoaderForType);
+
             
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
             final Class<? extends FlowFilePrioritizer> prioritizerClass = 
rawClass.asSubclass(FlowFilePrioritizer.class);
             final Object processorObj = prioritizerClass.newInstance();
@@ -2772,6 +2866,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return initialized.get();
     }
 
+    public boolean isFlowSynchronized() {
+        return flowSynchronized.get();
+    }
+
     public void startConnectable(final Connectable connectable) {
         final ProcessGroup group = 
requireNonNull(connectable).getProcessGroup();
 
@@ -2835,83 +2933,66 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         lookupGroup(groupId).stopProcessing();
     }
 
-    public ReportingTaskNode createReportingTask(final String type) throws 
ReportingTaskInstantiationException {
-        return createReportingTask(type, true);
+    public ReportingTaskNode createReportingTask(final String type, final 
BundleCoordinate bundleCoordinate) throws ReportingTaskInstantiationException {
+        return createReportingTask(type, bundleCoordinate, true);
     }
 
-    public ReportingTaskNode createReportingTask(final String type, final 
boolean firstTimeAdded) throws ReportingTaskInstantiationException {
-        return createReportingTask(type, UUID.randomUUID().toString(), 
firstTimeAdded);
+    public ReportingTaskNode createReportingTask(final String type, final 
BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) throws 
ReportingTaskInstantiationException {
+        return createReportingTask(type, UUID.randomUUID().toString(), 
bundleCoordinate, firstTimeAdded);
     }
 
     @Override
-    public ReportingTaskNode createReportingTask(final String type, final 
String id, final boolean firstTimeAdded) throws 
ReportingTaskInstantiationException {
-        return createReportingTask(type, id, firstTimeAdded, true);
+    public ReportingTaskNode createReportingTask(final String type, final 
String id, final BundleCoordinate bundleCoordinate,final boolean 
firstTimeAdded) throws ReportingTaskInstantiationException {
+        return createReportingTask(type, id, bundleCoordinate, firstTimeAdded, 
true);
     }
 
-    public ReportingTaskNode createReportingTask(final String type, final 
String id, final boolean firstTimeAdded, final boolean register) throws 
ReportingTaskInstantiationException {
-        if (type == null || id == null) {
+    public ReportingTaskNode createReportingTask(final String type, final 
String id, final BundleCoordinate bundleCoordinate, final boolean 
firstTimeAdded, final boolean register)
+            throws ReportingTaskInstantiationException {
+        if (type == null || id == null || bundleCoordinate == null) {
             throw new NullPointerException();
         }
 
-        ReportingTask task = null;
+        LoggableComponent<ReportingTask> task = null;
         boolean creationSuccessful = true;
-        final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
-            final ClassLoader detectedClassLoader = 
ExtensionManager.getClassLoader(type, id);
-            final Class<?> rawClass;
-            if (detectedClassLoader == null) {
-                rawClass = Class.forName(type);
-            } else {
-                rawClass = Class.forName(type, false, detectedClassLoader);
-            }
-
-            Thread.currentThread().setContextClassLoader(detectedClassLoader);
-            final Class<? extends ReportingTask> reportingTaskClass = 
rawClass.asSubclass(ReportingTask.class);
-            final Object reportingTaskObj = reportingTaskClass.newInstance();
-            task = reportingTaskClass.cast(reportingTaskObj);
+            task = instantiateReportingTask(type, id, bundleCoordinate);
         } catch (final Exception e) {
             LOG.error("Could not create Reporting Task of type " + type + " 
for ID " + id + "; creating \"Ghost\" implementation", e);
             final GhostReportingTask ghostTask = new GhostReportingTask();
             ghostTask.setIdentifier(id);
             ghostTask.setCanonicalClassName(type);
-            task = ghostTask;
+            task = new LoggableComponent<>(ghostTask, bundleCoordinate, null);
             creationSuccessful = false;
-        } finally {
-            if (ctxClassLoader != null) {
-                Thread.currentThread().setContextClassLoader(ctxClassLoader);
-            }
         }
 
-        final ComponentLog logger = new SimpleProcessLogger(id, task);
         final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(controllerServiceProvider, variableRegistry);
         final ReportingTaskNode taskNode;
         if (creationSuccessful) {
-            taskNode = new StandardReportingTaskNode(task, id, this, 
processScheduler, validationContextFactory, variableRegistry, logger);
+            taskNode = new StandardReportingTaskNode(task, id, this, 
processScheduler, validationContextFactory, variableRegistry);
         } else {
             final String simpleClassName = type.contains(".") ? 
StringUtils.substringAfterLast(type, ".") : type;
             final String componentType = "(Missing) " + simpleClassName;
 
-            taskNode = new StandardReportingTaskNode(task, id, this, 
processScheduler, validationContextFactory, componentType, type, 
variableRegistry, logger);
+            taskNode = new StandardReportingTaskNode(task, id, this, 
processScheduler, validationContextFactory, componentType, type, 
variableRegistry, true);
         }
 
-        taskNode.setName(task.getClass().getSimpleName());
+        
taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
 
         if (firstTimeAdded) {
-            final ComponentLog componentLog = new SimpleProcessLogger(id, 
taskNode.getReportingTask());
             final ReportingInitializationContext config = new 
StandardReportingInitializationContext(id, taskNode.getName(),
-                    SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, 
this, nifiProperties);
+                    SchedulingStrategy.TIMER_DRIVEN, "1 min", 
taskNode.getLogger(), this, nifiProperties);
 
             try {
-                task.initialize(config);
+                taskNode.getReportingTask().initialize(config);
             } catch (final InitializationException ie) {
                 throw new ReportingTaskInstantiationException("Failed to 
initialize reporting task of type " + type, ie);
             }
 
             try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), 
taskNode.getReportingTask().getIdentifier())) {
-                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
task);
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
taskNode.getReportingTask());
                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
 taskNode.getReportingTask());
             } catch (final Exception e) {
-                throw new ComponentLifeCycleException("Failed to invoke 
On-Added Lifecycle methods of " + task, e);
+                throw new ComponentLifeCycleException("Failed to invoke 
On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
             }
         }
 
@@ -2927,6 +3008,63 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return taskNode;
     }
 
+    private LoggableComponent<ReportingTask> instantiateReportingTask(final 
String type, final String id, final BundleCoordinate bundleCoordinate)
+            throws ReportingTaskInstantiationException {
+
+        final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            final Bundle reportingTaskBundle = 
ExtensionManager.getBundle(bundleCoordinate);
+            if (reportingTaskBundle == null) {
+                throw new IllegalStateException("Unable to find bundle for 
coordinate " + bundleCoordinate.getCoordinate());
+            }
+
+            final ClassLoader detectedClassLoader = 
ExtensionManager.createInstanceClassLoader(type, id, reportingTaskBundle);
+            final Class<?> rawClass = Class.forName(type, false, 
detectedClassLoader);
+            Thread.currentThread().setContextClassLoader(detectedClassLoader);
+
+            final Class<? extends ReportingTask> reportingTaskClass = 
rawClass.asSubclass(ReportingTask.class);
+            final Object reportingTaskObj = reportingTaskClass.newInstance();
+
+            final ReportingTask reportingTask = 
reportingTaskClass.cast(reportingTaskObj);
+            final ComponentLog componentLog = new SimpleProcessLogger(id, 
reportingTask);
+
+            return new LoggableComponent<>(reportingTask, bundleCoordinate, 
componentLog);
+        } catch (final Exception e) {
+            throw new ReportingTaskInstantiationException(type, e);
+        } finally {
+            if (ctxClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(ctxClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public void changeReportingTaskType(final ReportingTaskNode existingNode, 
final String newType, final BundleCoordinate bundleCoordinate) throws 
ReportingTaskInstantiationException {
+        if (existingNode == null) {
+            throw new IllegalStateException("Existing ReportingTaskNode cannot 
be null");
+        }
+
+        final String id = existingNode.getReportingTask().getIdentifier();
+
+        // createReportingTask will create a new instance class loader for the 
same id so
+        // save the instance class loader to use it for calling OnRemoved on 
the existing processor
+        final ClassLoader existingInstanceClassLoader = 
ExtensionManager.getInstanceClassLoader(id);
+
+        // set firstTimeAdded to true so lifecycle annotations get fired, but 
don't register this node
+        // attempt the creation to make sure it works before firing the 
OnRemoved methods below
+        final ReportingTaskNode newNode = createReportingTask(newType, id, 
bundleCoordinate, true, false);
+
+        // call OnRemoved for the existing reporting task using the previous 
instance class loader
+        try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, 
existingNode.getReportingTask(), existingNode.getConfigurationContext());
+        }
+
+        // set the new reporting task into the existing node
+        final LoggableComponent<ReportingTask> newReportingTask = new 
LoggableComponent<>(newNode.getReportingTask(), newNode.getBundleCoordinate(), 
newNode.getLogger());
+        existingNode.setReportingTask(newReportingTask);
+        existingNode.setExtensionMissing(newNode.isExtensionMissing());
+    }
+
     @Override
     public ReportingTaskNode getReportingTaskNode(final String taskId) {
         return reportingTasks.get(taskId);
@@ -2988,8 +3126,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     @Override
-    public ControllerServiceNode createControllerService(final String type, 
final String id, final boolean firstTimeAdded) {
-        final ControllerServiceNode serviceNode = 
controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+    public ControllerServiceNode createControllerService(final String type, 
final String id, final BundleCoordinate bundleCoordinate, final boolean 
firstTimeAdded) {
+        final ControllerServiceNode serviceNode = 
controllerServiceProvider.createControllerService(type, id, bundleCoordinate, 
firstTimeAdded);
 
         // Register log observer to provide bulletins when reporting task logs 
anything at WARN level or above
         final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
@@ -3007,6 +3145,42 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return serviceNode;
     }
 
+    public void changeControllerServiceType(final ControllerServiceNode 
existingNode, final String newType, final BundleCoordinate bundleCoordinate)
+            throws ControllerServiceInstantiationException {
+        if (existingNode == null) {
+            throw new IllegalStateException("Existing ControllerServiceNode 
cannot be null");
+        }
+
+        final String id = existingNode.getIdentifier();
+
+        // createControllerService will create a new instance class loader for 
the same id so
+        // save the instance class loader to use it for calling OnRemoved on 
the existing service
+        final ClassLoader existingInstanceClassLoader = 
ExtensionManager.getInstanceClassLoader(id);
+
+        // create a new node with firstTimeAdded as true so lifecycle methods 
get called
+        // attempt the creation to make sure it works before firing the 
OnRemoved methods below
+        final ControllerServiceNode newNode = 
controllerServiceProvider.createControllerService(newType, id, 
bundleCoordinate, true);
+
+        // call OnRemoved for the existing service using the previous instance 
class loader
+        try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+            final ConfigurationContext configurationContext = new 
StandardConfigurationContext(existingNode, controllerServiceProvider, null, 
variableRegistry);
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, 
existingNode.getControllerServiceImplementation(), configurationContext);
+        }
+
+        // take the invocation handler that was created for new proxy and is 
set to look at the new node,
+        // and set it to look at the existing node
+        final ControllerServiceInvocationHandler invocationHandler = 
newNode.getInvocationHandler();
+        invocationHandler.setServiceNode(existingNode);
+
+        // create LoggableComponents for the proxy and implementation
+        final LoggableComponent<ControllerService> loggableProxy = new 
LoggableComponent<>(newNode.getProxiedControllerService(), bundleCoordinate, 
newNode.getLogger());
+        final LoggableComponent<ControllerService> loggableImplementation = 
new LoggableComponent<>(newNode.getControllerServiceImplementation(), 
bundleCoordinate, newNode.getLogger());
+
+        // set the new impl, proxy, and invocation handler into the existing 
node
+        existingNode.setControllerServiceAndProxy(loggableImplementation, 
loggableProxy, invocationHandler);
+        existingNode.setExtensionMissing(newNode.isExtensionMissing());
+    }
+
     @Override
     public void enableReportingTask(final ReportingTaskNode reportingTaskNode) 
{
         reportingTaskNode.verifyCanEnable();

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java
new file mode 100644
index 0000000..175f252
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.cluster.ConnectionException;
+
+/**
+ * Represents the exceptional case when a node fails to join the cluster 
because a bundle being used by the cluster does not exist on the node.
+ */
+public class MissingBundleException extends ConnectionException {
+
+    private static final long serialVersionUID = 198234798234794L;
+
+    public MissingBundleException(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+
+    public MissingBundleException(Throwable cause) {
+        super(cause);
+    }
+
+    public MissingBundleException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MissingBundleException(String message) {
+        super(message);
+    }
+
+    public MissingBundleException() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
new file mode 100644
index 0000000..687d5c4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.Processor;
+
+/**
+ * Holder for StandardProcessorNode to atomically swap out the component.
+ */
+public class ProcessorDetails {
+
+    private final Processor processor;
+    private final Class<?> procClass;
+    private final boolean triggerWhenEmpty;
+    private final boolean sideEffectFree;
+    private final boolean triggeredSerially;
+    private final boolean triggerWhenAnyDestinationAvailable;
+    private final boolean eventDrivenSupported;
+    private final boolean batchSupported;
+    private final InputRequirement.Requirement inputRequirement;
+    private final ComponentLog componentLog;
+    private final BundleCoordinate bundleCoordinate;
+
+    public ProcessorDetails(final LoggableComponent<Processor> processor) {
+        this.processor = processor.getComponent();
+        this.componentLog = processor.getLogger();
+        this.bundleCoordinate = processor.getBundleCoordinate();
+
+        this.procClass = this.processor.getClass();
+        this.triggerWhenEmpty = 
procClass.isAnnotationPresent(TriggerWhenEmpty.class);
+        this.sideEffectFree = 
procClass.isAnnotationPresent(SideEffectFree.class);
+        this.batchSupported = 
procClass.isAnnotationPresent(SupportsBatching.class);
+        this.triggeredSerially = 
procClass.isAnnotationPresent(TriggerSerially.class);
+        this.triggerWhenAnyDestinationAvailable = 
procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class);
+        this.eventDrivenSupported = 
procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && 
!triggerWhenEmpty;
+
+        final boolean inputRequirementPresent = 
procClass.isAnnotationPresent(InputRequirement.class);
+        if (inputRequirementPresent) {
+            this.inputRequirement = 
procClass.getAnnotation(InputRequirement.class).value();
+        } else {
+            this.inputRequirement = InputRequirement.Requirement.INPUT_ALLOWED;
+        }
+    }
+
+    public Processor getProcessor() {
+        return processor;
+    }
+
+    public Class<?> getProcClass() {
+        return procClass;
+    }
+
+    public boolean isTriggerWhenEmpty() {
+        return triggerWhenEmpty;
+    }
+
+    public boolean isSideEffectFree() {
+        return sideEffectFree;
+    }
+
+    public boolean isTriggeredSerially() {
+        return triggeredSerially;
+    }
+
+    public boolean isTriggerWhenAnyDestinationAvailable() {
+        return triggerWhenAnyDestinationAvailable;
+    }
+
+    public boolean isEventDrivenSupported() {
+        return eventDrivenSupported;
+    }
+
+    public boolean isBatchSupported() {
+        return batchSupported;
+    }
+
+    public InputRequirement.Requirement getInputRequirement() {
+        return inputRequirement;
+    }
+
+    public ComponentLog getComponentLog() {
+        return componentLog;
+    }
+
+    public BundleCoordinate getBundleCoordinate() {
+        return bundleCoordinate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 2d35a63..0ce6742 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -16,39 +16,10 @@
  */
 package org.apache.nifi.controller;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.cluster.ConnectionException;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
@@ -94,6 +65,38 @@ import org.apache.nifi.web.revision.RevisionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
 public class StandardFlowService implements FlowService, ProtocolHandler {
 
     private static final String EVENT_CATEGORY = "Controller";
@@ -135,11 +138,11 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
      */
     private NodeIdentifier nodeId;
 
-    private final NiFiProperties nifiProperties;
-
     // guardedBy rwLock
     private boolean firstControllerInitialization = true;
 
+    private final NiFiProperties nifiProperties;
+
     private static final String CONNECTION_EXCEPTION_MSG_PREFIX = "Failed to 
connect node to cluster because ";
     private static final Logger logger = 
LoggerFactory.getLogger(StandardFlowService.class);
 
@@ -434,9 +437,28 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
     }
 
     @Override
-    public void load(final DataFlow dataFlow) throws IOException, 
FlowSerializationException, FlowSynchronizationException, 
UninheritableFlowException {
+    public void load(final DataFlow dataFlow) throws IOException, 
FlowSerializationException, FlowSynchronizationException, 
UninheritableFlowException, MissingBundleException {
         if (configuredForClustering) {
-            final DataFlow proposedFlow = (dataFlow == null) ? 
createDataFlow() : dataFlow;
+            // Create the initial flow from disk if it exists, or from 
serializing the empty root group in flow controller
+            final DataFlow initialFlow = (dataFlow == null) ? createDataFlow() 
: dataFlow;
+            if (logger.isTraceEnabled()) {
+                logger.trace("InitialFlow = " + new 
String(initialFlow.getFlow(), StandardCharsets.UTF_8));
+            }
+
+            // Sync the initial flow into the flow controller so that if the 
flow came from disk we loaded the
+            // whole flow into the flow controller and applied any bundle 
upgrades
+            writeLock.lock();
+            try {
+                loadFromBytes(initialFlow, true);
+            } finally {
+                writeLock.unlock();
+            }
+
+            // Get the proposed flow by serializing the flow controller which 
now has the synced version from above
+            final DataFlow proposedFlow = createDataFlowFromController();
+            if (logger.isTraceEnabled()) {
+                logger.trace("ProposedFlow = " + new 
String(proposedFlow.getFlow(), StandardCharsets.UTF_8));
+            }
 
             /*
              * Attempt to connect to the cluster. If the manager is able to
@@ -457,9 +479,6 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                 if (response == null || response.shouldTryLater()) {
                     logger.info("Flow controller will load local dataflow and 
suspend connection handshake until a cluster connection response is received.");
 
-                    // load local proposed flow
-                    loadFromBytes(proposedFlow, false);
-
                     // set node ID on controller before we start heartbeating 
because heartbeat needs node ID
                     controller.setNodeId(nodeId);
                     clusterCoordinator.setLocalNodeIdentifier(nodeId);
@@ -479,6 +498,9 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                      */
                     controller.startHeartbeating();
 
+                    // Initialize the controller after the flow is loaded so 
we don't take any actions on repos until everything is good
+                    initializeController();
+
                     // notify controller that flow is initialized
                     try {
                         controller.onFlowInitialized(autoResumeState);
@@ -491,21 +513,26 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                 } else {
                     try {
                         loadFromConnectionResponse(response);
-                        dao.save(controller, true);
                     } catch (final Exception e) {
                         logger.error("Failed to load flow from cluster due to: 
" + e, e);
                         handleConnectionFailure(e);
                         throw new IOException(e);
                     }
                 }
+
+                // save the flow in the controller so we write out the latest 
flow with any updated bundles to disk
+                dao.save(controller, true);
+
             } finally {
                 writeLock.unlock();
             }
         } else {
             writeLock.lock();
             try {
-                // operating in standalone mode, so load proposed flow
+                // operating in standalone mode, so load proposed flow and 
initialize the controller
                 loadFromBytes(dataFlow, true);
+                initializeController();
+                dao.save(controller, true);
             } finally {
                 writeLock.unlock();
             }
@@ -516,6 +543,8 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
         DisconnectionCode disconnectionCode;
         if (ex instanceof UninheritableFlowException) {
             disconnectionCode = DisconnectionCode.MISMATCHED_FLOWS;
+        } else if (ex instanceof MissingBundleException) {
+            disconnectionCode = DisconnectionCode.MISSING_BUNDLE;
         } else if (ex instanceof FlowSynchronizationException) {
             disconnectionCode = DisconnectionCode.MISMATCHED_FLOWS;
         } else {
@@ -533,7 +562,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
             // create the response
             final FlowResponseMessage response = new FlowResponseMessage();
-            response.setDataFlow(createDataFlow());
+            response.setDataFlow(createDataFlowFromController());
             return response;
         } catch (final Exception ex) {
             throw new ProtocolException("Failed serializing flow controller 
state for flow request due to: " + ex, ex);
@@ -549,15 +578,15 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
     @Override
     public StandardDataFlow createDataFlow() throws IOException {
-        final byte[] snippetBytes = controller.getSnippetManager().export();
-        final byte[] authorizerFingerprint = getAuthorizerFingerprint();
-
         // Load the flow from disk if the file exists.
         if (dao.isFlowPresent()) {
             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
             dao.load(baos);
             final byte[] bytes = baos.toByteArray();
-            final StandardDataFlow fromDisk = new StandardDataFlow(bytes, 
snippetBytes, authorizerFingerprint);
+
+            final byte[] snippetBytes = 
controller.getSnippetManager().export();
+            final byte[] authorizerFingerprint = getAuthorizerFingerprint();
+            final StandardDataFlow fromDisk = new StandardDataFlow(bytes, 
snippetBytes, authorizerFingerprint, new HashSet<>());
             return fromDisk;
         }
 
@@ -566,14 +595,28 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
         // will automatically create a Root Process Group, and we need to 
ensure that
         // we replicate that Process Group to all nodes in the cluster, so 
that they all
         // end up with the same ID for the root Process Group.
+        return createDataFlowFromController();
+    }
+
+    @Override
+    public StandardDataFlow createDataFlowFromController() throws IOException {
+        final byte[] snippetBytes = controller.getSnippetManager().export();
+        final byte[] authorizerFingerprint = getAuthorizerFingerprint();
+
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         dao.save(controller, baos);
         final byte[] flowBytes = baos.toByteArray();
         baos.reset();
 
-        return new StandardDataFlow(flowBytes, snippetBytes, 
authorizerFingerprint);
+        final Set<String> missingComponents = new HashSet<>();
+        controller.getRootGroup().findAllProcessors().stream().filter(p -> 
p.isExtensionMissing()).forEach(p -> missingComponents.add(p.getIdentifier()));
+        controller.getAllControllerServices().stream().filter(cs -> 
cs.isExtensionMissing()).forEach(cs -> 
missingComponents.add(cs.getIdentifier()));
+        controller.getAllReportingTasks().stream().filter(r -> 
r.isExtensionMissing()).forEach(r -> missingComponents.add(r.getIdentifier()));
+
+        return new StandardDataFlow(flowBytes, snippetBytes, 
authorizerFingerprint, missingComponents);
     }
 
+
     private NodeIdentifier getNodeId() {
         readLock.lock();
         try {
@@ -593,7 +636,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
             if (connectionResponse.getDataFlow() == null) {
                 logger.info("Received a Reconnection Request that contained no 
DataFlow. Will attempt to connect to cluster using local flow.");
-                connectionResponse = connect(false, false, createDataFlow());
+                connectionResponse = connect(false, false, 
createDataFlowFromController());
             }
 
             loadFromConnectionResponse(connectionResponse);
@@ -623,7 +666,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
         writeLock.lock();
         try {
 
-            logger.info("Disconnecting node.");
+            logger.info("Disconnecting node due to " + explanation);
 
             // mark node as not connected
             controller.setConnectionStatus(new NodeConnectionStatus(nodeId, 
DisconnectionCode.UNKNOWN, explanation));
@@ -638,7 +681,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             controller.setClustered(false, null);
             clusterCoordinator.setConnected(false);
 
-            logger.info("Node disconnected.");
+            logger.info("Node disconnected due to " + explanation);
 
         } finally {
             writeLock.unlock();
@@ -647,31 +690,30 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
     // write lock must already be acquired
     private void loadFromBytes(final DataFlow proposedFlow, final boolean 
allowEmptyFlow)
-            throws IOException, FlowSerializationException, 
FlowSynchronizationException, UninheritableFlowException {
+            throws IOException, FlowSerializationException, 
FlowSynchronizationException, UninheritableFlowException, 
MissingBundleException {
         logger.trace("Loading flow from bytes");
 
         // resolve the given flow (null means load flow from disk)
         final DataFlow actualProposedFlow;
         final byte[] flowBytes;
         final byte[] authorizerFingerprint;
+        final Set<String> missingComponents;
+
         if (proposedFlow == null) {
             final ByteArrayOutputStream flowOnDisk = new 
ByteArrayOutputStream();
             copyCurrentFlow(flowOnDisk);
             flowBytes = flowOnDisk.toByteArray();
             authorizerFingerprint = getAuthorizerFingerprint();
+            missingComponents = new HashSet<>();
             logger.debug("Loaded Flow from bytes");
         } else {
             flowBytes = proposedFlow.getFlow();
             authorizerFingerprint = proposedFlow.getAuthorizerFingerprint();
+            missingComponents = proposedFlow.getMissingComponents();
             logger.debug("Loaded flow from proposed flow");
         }
 
-        actualProposedFlow = new StandardDataFlow(flowBytes, null, 
authorizerFingerprint);
-
-        if (firstControllerInitialization) {
-            // load the controller services
-            logger.debug("Loading controller services");
-        }
+        actualProposedFlow = new StandardDataFlow(flowBytes, null, 
authorizerFingerprint, missingComponents);
 
         // load the flow
         logger.debug("Loading proposed flow into FlowController");
@@ -682,6 +724,8 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             throw new FlowSynchronizationException("Failed to load flow 
because unable to connect to cluster and local flow is empty");
         }
 
+
+
         final List<Template> templates = loadTemplates();
         for (final Template template : templates) {
             final Template existing = 
rootGroup.getTemplate(template.getIdentifier());
@@ -692,16 +736,6 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                 logger.info("Template '{}' was already present in Root Group 
so will not import from file", template.getDetails().getName());
             }
         }
-
-        // lazy initialization of controller tasks and flow
-        if (firstControllerInitialization) {
-            logger.debug("First controller initialization. Loading reporting 
tasks and initializing controller.");
-
-            // initialize the flow
-            controller.initializeFlow();
-
-            firstControllerInitialization = false;
-        }
     }
 
     /**
@@ -867,6 +901,9 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
             // get the dataflow from the response
             final DataFlow dataFlow = response.getDataFlow();
+            if (logger.isTraceEnabled()) {
+                logger.trace("ResponseFlow = " + new 
String(dataFlow.getFlow(), StandardCharsets.UTF_8));
+            }
 
             // load new controller state
             loadFromBytes(dataFlow, true);
@@ -884,6 +921,9 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
             controller.setConnectionStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTED));
 
+            // Initialize the controller after the flow is loaded so we don't 
take any actions on repos until everything is good
+            initializeController();
+
             // start the processors as indicated by the dataflow
             controller.onFlowInitialized(autoResumeState);
 
@@ -892,6 +932,8 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             controller.startHeartbeating();
         } catch (final UninheritableFlowException ufe) {
             throw new 
UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow is 
different than cluster flow.", ufe);
+        } catch (final MissingBundleException mbe) {
+            throw new MissingBundleException(CONNECTION_EXCEPTION_MSG_PREFIX + 
"cluster flow contains bundles that do not exist on the current node", mbe);
         } catch (final FlowSerializationException fse) {
             throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + 
"local or cluster flow is malformed.", fse);
         } catch (final FlowSynchronizationException fse) {
@@ -905,6 +947,14 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
     }
 
+    private void initializeController() throws IOException {
+        if (firstControllerInitialization) {
+            logger.debug("First controller initialization, initializing 
controller...");
+            controller.initializeFlow();
+            firstControllerInitialization = false;
+        }
+    }
+
     @Override
     public void copyCurrentFlow(final OutputStream os) throws IOException {
         readLock.lock();
@@ -939,9 +989,15 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
         @Override
         public void run() {
-            final ClassLoader currentCl = 
Thread.currentThread().getContextClassLoader();
-            final ClassLoader cl = 
NarClassLoaders.getInstance().getFrameworkClassLoader();
-            Thread.currentThread().setContextClassLoader(cl);
+            ClassLoader currentCl = null;
+
+            final Bundle frameworkBundle = 
NarClassLoaders.getInstance().getFrameworkBundle();
+            if (frameworkBundle != null) {
+                currentCl = Thread.currentThread().getContextClassLoader();
+                final ClassLoader cl = frameworkBundle.getClassLoader();
+                Thread.currentThread().setContextClassLoader(cl);
+            }
+
             try {
                 //Hang onto the SaveHolder here rather than setting it to null 
because if the save fails we will try again
                 final SaveHolder holder = 
StandardFlowService.this.saveHolder.get();

Reply via email to