http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index db9e68b..436215f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -16,38 +16,11 @@
  */
 package org.apache.nifi.controller;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPInputStream;
-
-import javax.xml.XMLConstants;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -88,11 +61,13 @@ import 
org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
@@ -114,6 +89,33 @@ import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+
 /**
  */
 public class StandardFlowSynchronizer implements FlowSynchronizer {
@@ -147,7 +149,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
 
     @Override
     public void sync(final FlowController controller, final DataFlow 
proposedFlow, final StringEncryptor encryptor)
-            throws FlowSerializationException, UninheritableFlowException, 
FlowSynchronizationException {
+            throws FlowSerializationException, UninheritableFlowException, 
FlowSynchronizationException, MissingBundleException {
 
         // handle corner cases involving no proposed flow
         if (proposedFlow == null) {
@@ -158,15 +160,15 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             }
         }
 
-        // determine if the controller has been initialized
-        final boolean initialized = controller.isInitialized();
-        logger.debug("Synching FlowController with proposed flow: Controller 
is Initialized = {}", initialized);
+        // determine if the controller already had flow sync'd to it
+        final boolean flowAlreadySynchronized = 
controller.isFlowSynchronized();
+        logger.debug("Synching FlowController with proposed flow: Controller 
is Already Synchronized = {}", flowAlreadySynchronized);
 
         // serialize controller state to bytes
         final byte[] existingFlow;
         final boolean existingFlowEmpty;
         try {
-            if (initialized) {
+            if (flowAlreadySynchronized) {
                 existingFlow = toBytes(controller);
                 existingFlowEmpty = 
controller.getGroup(controller.getRootGroupId()).isEmpty() && 
controller.getAllReportingTasks().isEmpty() && 
controller.getAllControllerServices().isEmpty();
             } else {
@@ -232,11 +234,24 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             policyBasedAuthorizer = null;
         }
 
-        final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, 
existingSnippets, existingAuthFingerprint);
+        final Set<String> missingComponents = new HashSet<>();
+        controller.getAllControllerServices().stream().filter(cs -> 
cs.isExtensionMissing()).forEach(cs -> 
missingComponents.add(cs.getIdentifier()));
+        controller.getAllReportingTasks().stream().filter(r -> 
r.isExtensionMissing()).forEach(r -> missingComponents.add(r.getIdentifier()));
+        controller.getRootGroup().findAllProcessors().stream().filter(p -> 
p.isExtensionMissing()).forEach(p -> missingComponents.add(p.getIdentifier()));
+
+        final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, 
existingSnippets, existingAuthFingerprint, missingComponents);
+
+        Document configuration = null;
 
         // check that the proposed flow is inheritable by the controller
         try {
-            if (!existingFlowEmpty) {
+            if (existingFlowEmpty) {
+                configuration = parseFlowBytes(proposedFlow.getFlow());
+                if (configuration != null) {
+                    logger.trace("Checking bunde compatibility");
+                    checkBundleCompatibility(configuration);
+                }
+            } else {
                 logger.trace("Checking flow inheritability");
                 final String problemInheritingFlow = 
checkFlowInheritability(existingDataFlow, proposedFlow, controller);
                 if (problemInheritingFlow != null) {
@@ -247,6 +262,13 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             throw new FlowSerializationException("Failed to generate flow 
fingerprints", fe);
         }
 
+        logger.trace("Checking missing component inheritability");
+
+        final String problemInheritingMissingComponents = 
checkMissingComponentsInheritability(existingDataFlow, proposedFlow);
+        if (problemInheritingMissingComponents != null) {
+            throw new UninheritableFlowException("Proposed Flow is not 
inheritable by the flow controller because of differences in missing 
components: " + problemInheritingMissingComponents);
+        }
+
         logger.trace("Checking authorizer inheritability");
 
         final AuthorizerInheritability authInheritability = 
checkAuthorizerInheritability(existingDataFlow, proposedFlow);
@@ -256,7 +278,9 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
 
         // create document by parsing proposed flow bytes
         logger.trace("Parsing proposed flow bytes as DOM document");
-        final Document configuration = parseFlowBytes(proposedFlow.getFlow());
+        if (configuration == null) {
+            configuration = parseFlowBytes(proposedFlow.getFlow());
+        }
 
         // attempt to sync controller with proposed flow
         try {
@@ -282,7 +306,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
 
                     // if this controller isn't initialized or its empty, add 
the root group, otherwise update
                     final ProcessGroup rootGroup;
-                    if (!initialized || existingFlowEmpty) {
+                    if (!flowAlreadySynchronized || existingFlowEmpty) {
                         logger.trace("Adding root process group");
                         rootGroup = addProcessGroup(controller, /* parent 
group */ null, rootGroupElement, encryptor, encodingVersion);
                     } else {
@@ -315,7 +339,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                     final Map<ReportingTaskNode, ReportingTaskDTO> 
reportingTaskNodesToDTOs = new HashMap<>();
                     for (final Element taskElement : reportingTaskElements) {
                         final ReportingTaskDTO dto = 
FlowFromDOMFactory.getReportingTask(taskElement, encryptor);
-                        final ReportingTaskNode reportingTask = 
getOrCreateReportingTask(controller, dto, initialized, existingFlowEmpty);
+                        final ReportingTaskNode reportingTask = 
getOrCreateReportingTask(controller, dto, flowAlreadySynchronized, 
existingFlowEmpty);
                         reportingTaskNodesToDTOs.put(reportingTask, dto);
                     }
 
@@ -323,7 +347,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                     if (controllerServicesElement != null) {
                         final List<Element> serviceElements = 
DomUtils.getChildElementsByTagName(controllerServicesElement, 
"controllerService");
 
-                        if (!initialized || existingFlowEmpty) {
+                        if (!flowAlreadySynchronized || existingFlowEmpty) {
                             // If the encoding version is null, we are loading 
a flow from NiFi 0.x, where Controller
                             // Services could not be scoped by Process Group. 
As a result, we want to move the Process Groups
                             // to the root Group. Otherwise, we want to use a 
null group, which indicates a Controller-level
@@ -370,7 +394,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
 
                     // now that controller services are loaded and enabled we 
can apply the scheduled state to each reporting task
                     for (Map.Entry<ReportingTaskNode, ReportingTaskDTO> entry 
: reportingTaskNodesToDTOs.entrySet()) {
-                        applyReportingTaskScheduleState(controller, 
entry.getValue(), entry.getKey(), initialized, existingFlowEmpty);
+                        applyReportingTaskScheduleState(controller, 
entry.getValue(), entry.getKey(), flowAlreadySynchronized, existingFlowEmpty);
                     }
                 }
             }
@@ -402,6 +426,42 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         }
     }
 
+    private void checkBundleCompatibility(final Document configuration) {
+        final NodeList bundleNodes = 
configuration.getElementsByTagName("bundle");
+        for (int i = 0; i < bundleNodes.getLength(); i++) {
+            final Node bundleNode = bundleNodes.item(i);
+            if (bundleNode instanceof Element) {
+                final Element bundleElement = (Element) bundleNode;
+
+                final Node componentNode = bundleElement.getParentNode();
+                if (componentNode instanceof Element) {
+                    final Element componentElement = (Element) componentNode;
+                    if (!withinTemplate(componentElement)) {
+                        final String componentType = 
DomUtils.getChildText(componentElement, "class");
+                        try {
+                            BundleUtils.getBundle(componentType, 
FlowFromDOMFactory.getBundle(bundleElement));
+                        } catch (IllegalStateException e) {
+                            throw new MissingBundleException(e.getMessage(), 
e);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean withinTemplate(final Element element) {
+        if ("template".equals(element.getTagName())) {
+            return true;
+        } else {
+            final Node parentNode = element.getParentNode();
+            if (parentNode instanceof Element) {
+                return withinTemplate((Element) parentNode);
+            } else {
+                return false;
+            }
+        }
+    }
+
     private void updateReportingTaskControllerServices(final 
Set<ReportingTaskNode> reportingTasks, final Map<String, ControllerServiceNode> 
controllerServiceMapping) {
         for (ReportingTaskNode reportingTask : reportingTasks) {
             if (reportingTask.getProperties() != null) {
@@ -513,7 +573,19 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             throws ReportingTaskInstantiationException {
         // create a new reporting task node when the controller is not 
initialized or the flow is empty
         if (!controllerInitialized || existingFlowEmpty) {
-            final ReportingTaskNode reportingTask = 
controller.createReportingTask(dto.getType(), dto.getId(), false);
+            BundleCoordinate coordinate;
+            try {
+                coordinate = BundleUtils.getCompatibleBundle(dto.getType(), 
dto.getBundle());
+            } catch (final IllegalStateException e) {
+                final BundleDTO bundleDTO = dto.getBundle();
+                if (bundleDTO == null) {
+                    coordinate = BundleCoordinate.UNKNOWN_COORDINATE;
+                } else {
+                    coordinate = new BundleCoordinate(bundleDTO.getGroup(), 
bundleDTO.getArtifact(), bundleDTO.getVersion());
+                }
+            }
+
+            final ReportingTaskNode reportingTask = 
controller.createReportingTask(dto.getType(), dto.getId(), coordinate, false);
             reportingTask.setName(dto.getName());
             reportingTask.setComments(dto.getComments());
             reportingTask.setSchedulingPeriod(dto.getSchedulingPeriod());
@@ -964,7 +1036,20 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         final List<Element> processorNodeList = 
getChildrenByTagName(processGroupElement, "processor");
         for (final Element processorElement : processorNodeList) {
             final ProcessorDTO processorDTO = 
FlowFromDOMFactory.getProcessor(processorElement, encryptor);
-            final ProcessorNode procNode = 
controller.createProcessor(processorDTO.getType(), processorDTO.getId(), false);
+
+            BundleCoordinate coordinate;
+            try {
+                coordinate = 
BundleUtils.getCompatibleBundle(processorDTO.getType(), 
processorDTO.getBundle());
+            } catch (final IllegalStateException e) {
+                final BundleDTO bundleDTO = processorDTO.getBundle();
+                if (bundleDTO == null) {
+                    coordinate = BundleCoordinate.UNKNOWN_COORDINATE;
+                } else {
+                    coordinate = new BundleCoordinate(bundleDTO.getGroup(), 
bundleDTO.getArtifact(), bundleDTO.getVersion());
+                }
+            }
+
+            final ProcessorNode procNode = 
controller.createProcessor(processorDTO.getType(), processorDTO.getId(), 
coordinate, false);
             processGroup.addProcessor(procNode);
             updateProcessor(procNode, processorDTO, processGroup, controller);
         }
@@ -1257,6 +1342,30 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         return processGroup;
     }
 
+    public String checkMissingComponentsInheritability(final DataFlow 
existingFlow, final DataFlow proposedFlow) {
+        if (existingFlow == null) {
+            return null;  // no existing flow, so equivalent to proposed flow
+        }
+
+        final Set<String> existingMissingComponents = new 
HashSet<>(existingFlow.getMissingComponents());
+        
existingMissingComponents.removeAll(proposedFlow.getMissingComponents());
+
+        if (existingMissingComponents.size() > 0) {
+            final String missingIds = 
StringUtils.join(existingMissingComponents, ",");
+            return "Current flow has missing components that are not 
considered missing in the proposed flow (" + missingIds + ")";
+        }
+
+        final Set<String> proposedMissingComponents = new 
HashSet<>(proposedFlow.getMissingComponents());
+        
proposedMissingComponents.removeAll(existingFlow.getMissingComponents());
+
+        if (proposedMissingComponents.size() > 0) {
+            final String missingIds = 
StringUtils.join(proposedMissingComponents, ",");
+            return "Proposed flow has missing components that are not 
considered missing in the current flow (" + missingIds + ")";
+        }
+
+        return null;
+    }
+
     /**
      * If both authorizers are external authorizers, or if the both are 
internal
      * authorizers with equal fingerprints, then an uniheritable result with no
@@ -1353,6 +1462,11 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             return "Proposed Flow was empty but Current Flow is not";  // 
existing flow is not empty and proposed flow is empty (we could orphan 
flowfiles)
         }
 
+        if (logger.isTraceEnabled()) {
+            logger.trace("Local Fingerprint Before Hash = {}", new Object[] 
{existingFlowFingerprintBeforeHash});
+            logger.trace("Proposed Fingerprint Before Hash = {}", new Object[] 
{proposedFlowFingerprintBeforeHash});
+        }
+
         final boolean inheritable = 
existingFlowFingerprintBeforeHash.equals(proposedFlowFingerprintBeforeHash);
         if (!inheritable) {
             return findFirstDiscrepancy(existingFlowFingerprintBeforeHash, 
proposedFlowFingerprintBeforeHash, "Flows");

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 3d2c222..afc94d8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -18,13 +18,9 @@ package org.apache.nifi.controller;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
@@ -36,6 +32,8 @@ import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.Connectable;
@@ -107,7 +105,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     public static final String DEFAULT_YIELD_PERIOD = "1 sec";
     public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec";
     private final AtomicReference<ProcessGroup> processGroup;
-    private final Processor processor;
+    private final AtomicReference<ProcessorDetails> processorRef;
     private final AtomicReference<String> identifier;
     private final Map<Connection, Connectable> destinations;
     private final Map<Relationship, Set<Connection>> connections;
@@ -123,13 +121,6 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     private final AtomicInteger concurrentTaskCount;
     private final AtomicLong yieldExpiration;
     private final AtomicLong schedulingNanos;
-    private final boolean triggerWhenEmpty;
-    private final boolean sideEffectFree;
-    private final boolean triggeredSerially;
-    private final boolean triggerWhenAnyDestinationAvailable;
-    private final boolean eventDrivenSupported;
-    private final boolean batchSupported;
-    private final Requirement inputRequirement;
     private final ProcessScheduler processScheduler;
     private long runNanos = 0L;
     private final NiFiProperties nifiProperties;
@@ -138,23 +129,26 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                                                    // ??????? NOT any more
     private ExecutionNode executionNode;
 
-    public StandardProcessorNode(final Processor processor, final String uuid,
+    public StandardProcessorNode(final LoggableComponent<Processor> processor, 
final String uuid,
                                  final ValidationContextFactory 
validationContextFactory, final ProcessScheduler scheduler,
                                  final ControllerServiceProvider 
controllerServiceProvider, final NiFiProperties nifiProperties,
-                                 final VariableRegistry variableRegistry, 
final ComponentLog logger) {
+                                 final VariableRegistry variableRegistry) {
 
         this(processor, uuid, validationContextFactory, scheduler, 
controllerServiceProvider,
-            processor.getClass().getSimpleName(), 
processor.getClass().getCanonicalName(), nifiProperties, variableRegistry, 
logger);
+            processor.getComponent().getClass().getSimpleName(), 
processor.getComponent().getClass().getCanonicalName(), nifiProperties, 
variableRegistry, false);
     }
 
-    public StandardProcessorNode(final Processor processor, final String uuid,
+    public StandardProcessorNode(final LoggableComponent<Processor> processor, 
final String uuid,
                                  final ValidationContextFactory 
validationContextFactory, final ProcessScheduler scheduler,
                                  final ControllerServiceProvider 
controllerServiceProvider,
                                  final String componentType, final String 
componentCanonicalClass, final NiFiProperties nifiProperties,
-                                 final VariableRegistry variableRegistry, 
final ComponentLog logger) {
+                                 final VariableRegistry variableRegistry, 
final boolean isExtensionMissing) {
+
+        super(uuid, validationContextFactory, controllerServiceProvider, 
componentType, componentCanonicalClass, variableRegistry, isExtensionMissing);
+
+        final ProcessorDetails processorDetails = new 
ProcessorDetails(processor);
+        this.processorRef = new AtomicReference<>(processorDetails);
 
-        super(processor, uuid, validationContextFactory, 
controllerServiceProvider, componentType, componentCanonicalClass, 
variableRegistry, logger);
-        this.processor = processor;
         identifier = new AtomicReference<>(uuid);
         destinations = new HashMap<>();
         connections = new HashMap<>();
@@ -175,26 +169,11 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         penalizationPeriod = new 
AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
         this.nifiProperties = nifiProperties;
 
-        final Class<?> procClass = processor.getClass();
-        triggerWhenEmpty = 
procClass.isAnnotationPresent(TriggerWhenEmpty.class);
-        sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class);
-        batchSupported = procClass.isAnnotationPresent(SupportsBatching.class);
-        triggeredSerially = 
procClass.isAnnotationPresent(TriggerSerially.class);
-        triggerWhenAnyDestinationAvailable = 
procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class);
-        eventDrivenSupported = 
procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && 
!triggerWhenEmpty;
-
-        final boolean inputRequirementPresent = 
procClass.isAnnotationPresent(InputRequirement.class);
-        if (inputRequirementPresent) {
-            inputRequirement = 
procClass.getAnnotation(InputRequirement.class).value();
-        } else {
-            inputRequirement = Requirement.INPUT_ALLOWED;
-        }
-
         schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
         executionNode = ExecutionNode.ALL;
         try {
-            if (procClass.isAnnotationPresent(DefaultSchedule.class)) {
-                DefaultSchedule dsc = 
procClass.getAnnotation(DefaultSchedule.class);
+            if 
(processorDetails.getProcClass().isAnnotationPresent(DefaultSchedule.class)) {
+                DefaultSchedule dsc = 
processorDetails.getProcClass().getAnnotation(DefaultSchedule.class);
                 try {
                     this.setSchedulingStrategy(dsc.strategy());
                 } catch (Throwable ex) {
@@ -206,7 +185,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                     
this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN);
                     LOG.error(String.format("Error while setting scheduling 
period from DefaultSchedule annotation: %s", ex.getMessage()), ex);
                 }
-                if (!triggeredSerially) {
+                if (!processorDetails.isTriggeredSerially()) {
                     try {
                         setMaxConcurrentTasks(dsc.concurrentTasks());
                     } catch (Throwable ex) {
@@ -219,6 +198,21 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         }
     }
 
+    @Override
+    public ConfigurableComponent getComponent() {
+        return processorRef.get().getProcessor();
+    }
+
+    @Override
+    public ComponentLog getLogger() {
+        return processorRef.get().getComponentLog();
+    }
+
+    @Override
+    public BundleCoordinate getBundleCoordinate() {
+        return processorRef.get().getBundleCoordinate();
+    }
+
     /**
      * @return comments about this specific processor instance
      */
@@ -250,7 +244,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      *            new comments
      */
     @Override
-    public void setComments(final String comments) {
+    public synchronized void setComments(final String comments) {
         if (isRunning()) {
             throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
@@ -263,7 +257,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    public void setPosition(final Position position) {
+    public synchronized void setPosition(final Position position) {
         this.position.set(position);
     }
 
@@ -273,7 +267,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    public void setStyle(final Map<String, String> style) {
+    public synchronized void setStyle(final Map<String, String> style) {
         if (style != null) {
             this.style.set(Collections.unmodifiableMap(new HashMap<>(style)));
         }
@@ -304,7 +298,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      */
     @Override
     public boolean isTriggerWhenEmpty() {
-        return triggerWhenEmpty;
+        return processorRef.get().isTriggerWhenEmpty();
     }
 
     /**
@@ -313,12 +307,12 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      */
     @Override
     public boolean isSideEffectFree() {
-        return sideEffectFree;
+        return processorRef.get().isSideEffectFree();
     }
 
     @Override
     public boolean isHighThroughputSupported() {
-        return batchSupported;
+        return processorRef.get().isBatchSupported();
     }
 
     /**
@@ -328,7 +322,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      */
     @Override
     public boolean isTriggerWhenAnyDestinationAvailable() {
-        return triggerWhenAnyDestinationAvailable;
+        return processorRef.get().isTriggerWhenAnyDestinationAvailable();
     }
 
     /**
@@ -339,7 +333,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      *            tolerant
      */
     @Override
-    public void setLossTolerant(final boolean lossTolerant) {
+    public synchronized void setLossTolerant(final boolean lossTolerant) {
         if (isRunning()) {
             throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
@@ -389,6 +383,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      *         annotation, if one exists, else <code>null</code>.
      */
     public String getProcessorDescription() {
+        final Processor processor = processorRef.get().getProcessor();
         final CapabilityDescription capDesc = 
processor.getClass().getAnnotation(CapabilityDescription.class);
         String description = null;
         if (capDesc != null) {
@@ -398,7 +393,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    public void setName(final String name) {
+    public synchronized void setName(final String name) {
         if (isRunning()) {
             throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
@@ -420,7 +415,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public boolean isEventDrivenSupported() {
-        return this.eventDrivenSupported;
+        return processorRef.get().isEventDrivenSupported();
     }
 
     /**
@@ -434,8 +429,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      *             Processor
      */
     @Override
-    public void setSchedulingStrategy(final SchedulingStrategy 
schedulingStrategy) {
-        if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && 
!eventDrivenSupported) {
+    public synchronized void setSchedulingStrategy(final SchedulingStrategy 
schedulingStrategy) {
+        if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && 
!processorRef.get().isEventDrivenSupported()) {
             // not valid. Just ignore it. We don't throw an Exception because 
if
             // a developer changes a Processor so that
             // it no longer supports EventDriven mode, we don't want the app to
@@ -461,7 +456,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    public void setScheduldingPeriod(final String schedulingPeriod) {
+    public synchronized void setScheduldingPeriod(final String 
schedulingPeriod) {
         if (isRunning()) {
             throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
@@ -495,7 +490,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    public void setExecutionNode(final ExecutionNode executionNode) {
+    public synchronized void setExecutionNode(final ExecutionNode 
executionNode) {
         this.executionNode = executionNode;
     }
 
@@ -510,7 +505,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    public void setRunDuration(final long duration, final TimeUnit timeUnit) {
+    public synchronized void setRunDuration(final long duration, final 
TimeUnit timeUnit) {
         if (duration < 0) {
             throw new IllegalArgumentException("Run Duration must be 
non-negative value; cannot set to "
                     + timeUnit.toSeconds(duration) + " seconds");
@@ -530,7 +525,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    public void setYieldPeriod(final String yieldPeriod) {
+    public synchronized void setYieldPeriod(final String yieldPeriod) {
         if (isRunning()) {
             throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
@@ -549,6 +544,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      */
     @Override
     public void yield() {
+        final Processor processor = processorRef.get().getProcessor();
         final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
         yield(yieldMillis, TimeUnit.MILLISECONDS);
 
@@ -587,7 +583,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    public void setPenalizationPeriod(final String penalizationPeriod) {
+    public synchronized void setPenalizationPeriod(final String 
penalizationPeriod) {
         if (isRunning()) {
             throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
@@ -609,21 +605,21 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      *             if the given value is less than 1
      */
     @Override
-    public void setMaxConcurrentTasks(final int taskCount) {
+    public synchronized void setMaxConcurrentTasks(final int taskCount) {
         if (isRunning()) {
             throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
         if (taskCount < 1 && getSchedulingStrategy() != 
SchedulingStrategy.EVENT_DRIVEN) {
             throw new IllegalArgumentException();
         }
-        if (!triggeredSerially) {
+        if (!isTriggeredSerially()) {
             concurrentTaskCount.set(taskCount);
         }
     }
 
     @Override
     public boolean isTriggeredSerially() {
-        return triggeredSerially;
+        return processorRef.get().isTriggeredSerially();
     }
 
     /**
@@ -641,7 +637,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    public void setBulletinLevel(final LogLevel level) {
+    public synchronized void setBulletinLevel(final LogLevel level) {
         
LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID,
 level);
     }
 
@@ -842,6 +838,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         Relationship returnRel = specRel;
 
         final Set<Relationship> relationships;
+        final Processor processor = processorRef.get().getProcessor();
         try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
             relationships = processor.getRelationships();
         }
@@ -857,7 +854,17 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public Processor getProcessor() {
-        return this.processor;
+        return processorRef.get().getProcessor();
+    }
+
+    @Override
+    public synchronized void setProcessor(final LoggableComponent<Processor> 
processor) {
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
+        }
+
+        final ProcessorDetails processorDetails = new 
ProcessorDetails(processor);
+        processorRef.set(processorDetails);
     }
 
     /**
@@ -888,6 +895,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     public Set<Relationship> getUndefinedRelationships() {
         final Set<Relationship> undefined = new HashSet<>();
         final Set<Relationship> relationships;
+        final Processor processor = processorRef.get().getProcessor();
         try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
             relationships = processor.getRelationships();
         }
@@ -943,10 +951,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
             final ValidationContext validationContext = 
this.getValidationContextFactory()
                 .newValidationContext(getProperties(), getAnnotationData(), 
getProcessGroupIdentifier(), getIdentifier());
 
-            final Collection<ValidationResult> validationResults;
-            try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getProcessor().getClass(), 
processor.getIdentifier())) {
-                validationResults = getProcessor().validate(validationContext);
-            }
+            final Collection<ValidationResult> validationResults = 
super.validate(validationContext);
 
             for (final ValidationResult result : validationResults) {
                 if (!result.isValid()) {
@@ -993,10 +998,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                 final ValidationContext validationContext = 
this.getValidationContextFactory()
                         .newValidationContext(getProperties(), 
getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
 
-                final Collection<ValidationResult> validationResults;
-                try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getProcessor().getClass(), 
processor.getIdentifier())) {
-                    validationResults = 
getProcessor().validate(validationContext);
-                }
+                final Collection<ValidationResult> validationResults = 
super.validate(validationContext);
 
                 for (final ValidationResult result : validationResults) {
                     if (!result.isValid()) {
@@ -1045,7 +1047,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public Requirement getInputRequirement() {
-        return inputRequirement;
+        return processorRef.get().getInputRequirement();
     }
 
     /**
@@ -1071,14 +1073,16 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
 
     @Override
     public Collection<Relationship> getRelationships() {
-        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getProcessor().getClass(), 
processor.getIdentifier())) {
+        final Processor processor = processorRef.get().getProcessor();
+        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
             return getProcessor().getRelationships();
         }
     }
 
     @Override
     public String toString() {
-        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getProcessor().getClass(), 
processor.getIdentifier())) {
+        final Processor processor = processorRef.get().getProcessor();
+        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
             return getProcessor().toString();
         }
     }
@@ -1089,12 +1093,13 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
     }
 
     @Override
-    public void setProcessGroup(final ProcessGroup group) {
+    public synchronized void setProcessGroup(final ProcessGroup group) {
         this.processGroup.set(group);
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
+        final Processor processor = processorRef.get().getProcessor();
         try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
             processor.onTrigger(context, sessionFactory);
         }
@@ -1266,8 +1271,15 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         if (!this.isValid()) {
             throw new IllegalStateException( "Processor " + this.getName() + " 
is not in a valid state due to " + this.getValidationErrors());
         }
+        final Processor processor = processorRef.get().getProcessor();
         final ComponentLog procLog = new 
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
-        if (this.scheduledState.compareAndSet(ScheduledState.STOPPED, 
ScheduledState.STARTING)) { // will ensure that the Processor represented by 
this node can only be started once
+
+        final boolean starting;
+        synchronized (this) {
+            starting = 
this.scheduledState.compareAndSet(ScheduledState.STOPPED, 
ScheduledState.STARTING);
+        }
+
+        if (starting) { // will ensure that the Processor represented by this 
node can only be started once
             final Runnable startProcRunnable = new Runnable() {
                 @Override
                 public void run() {
@@ -1293,7 +1305,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                     } catch (final Exception e) {
                         final Throwable cause = e instanceof 
InvocationTargetException ? e.getCause() : e;
                         procLog.error("{} failed to invoke @OnScheduled method 
due to {}; processor will not be scheduled to run for {} seconds",
-                            new Object[] 
{StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 
1000L}, cause);
+                                new 
Object[]{StandardProcessorNode.this.getProcessor(), cause, 
administrativeYieldMillis / 1000L}, cause);
                         LOG.error("Failed to invoke @OnScheduled method due to 
{}", cause.toString(), cause);
 
                         
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
@@ -1309,7 +1321,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
             };
             taskScheduler.execute(startProcRunnable);
         } else {
-            final String procName = this.processor.getClass().getSimpleName();
+            final String procName = processorRef.getClass().getSimpleName();
             LOG.warn("Can not start '" + procName
                     + "' since it's already in the process of being started or 
it is DISABLED - "
                     + scheduledState.get());
@@ -1348,7 +1360,11 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     @Override
     public <T extends ProcessContext & ControllerServiceLookup> void 
stop(final ScheduledExecutorService scheduler,
         final T processContext, final SchedulingAgent schedulingAgent, final 
ScheduleState scheduleState) {
-        LOG.info("Stopping processor: " + this.processor.getClass());
+
+        final Processor processor = processorRef.get().getProcessor();
+        LOG.info("Stopping processor: " + processor.getClass());
+
+
         if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, 
ScheduledState.STOPPING)) { // will ensure that the Processor represented by 
this node can only be stopped once
             scheduleState.incrementActiveThreadCount();
 
@@ -1385,14 +1401,14 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
                 }
             });
         } else {
-            /*
-             * We do compareAndSet() instead of set() to ensure that Processor
-             * stoppage is handled consistently including a condition where
-             * Processor never got a chance to transition to RUNNING state
-             * before stop() was called. If that happens the stop processor
-             * routine will be initiated in start() method, otherwise the IF
-             * part will handle the stop processor routine.
-             */
+        /*
+         * We do compareAndSet() instead of set() to ensure that Processor
+         * stoppage is handled consistently including a condition where
+         * Processor never got a chance to transition to RUNNING state
+         * before stop() was called. If that happens the stop processor
+         * routine will be initiated in start() method, otherwise the IF
+         * part will handle the stop processor routine.
+         */
             this.scheduledState.compareAndSet(ScheduledState.STARTING, 
ScheduledState.STOPPING);
         }
     }
@@ -1424,6 +1440,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      * </p>
      */
     private <T> void invokeTaskAsCancelableFuture(final 
SchedulingAgentCallback callback, final Callable<T> task) {
+        final Processor processor = processorRef.get().getProcessor();
         final String timeoutString = 
nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
         final long onScheduleTimeout = timeoutString == null ? 60000
                 : FormatUtils.getTimeDuration(timeoutString.trim(), 
TimeUnit.MILLISECONDS);
@@ -1431,19 +1448,19 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
         try {
             taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
         } catch (final InterruptedException e) {
-            LOG.warn("Thread was interrupted while waiting for processor '" + 
this.processor.getClass().getSimpleName()
+            LOG.warn("Thread was interrupted while waiting for processor '" + 
processor.getClass().getSimpleName()
                     + "' lifecycle OnScheduled operation to finish.");
             Thread.currentThread().interrupt();
             throw new RuntimeException("Interrupted while executing one of 
processor's OnScheduled tasks.", e);
         } catch (final TimeoutException e) {
             taskFuture.cancel(true);
             LOG.warn("Timed out while waiting for OnScheduled of '"
-                    + this.processor.getClass().getSimpleName()
+                    + processor.getClass().getSimpleName()
                     + "' processor to finish. An attempt is made to cancel the 
task via Thread.interrupt(). However it does not "
                     + "guarantee that the task will be canceled since the code 
inside current OnScheduled operation may "
                 + "have been written to ignore interrupts which may result in 
a runaway thread. This could lead to more issues, "
                     + "eventually requiring NiFi to be restarted. This is 
usually a bug in the target Processor '"
-                    + this.processor + "' that needs to be documented, 
reported and eventually fixed.");
+                    + processor + "' that needs to be documented, reported and 
eventually fixed.");
             throw new RuntimeException("Timed out while executing one of 
processor's OnScheduled task.", e);
         } catch (final ExecutionException e){
             throw new RuntimeException("Failed while executing one of 
processor's OnScheduled task.", e);
@@ -1457,4 +1474,5 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         final ProcessGroup group = getProcessGroup();
         return group == null ? null : group.getIdentifier();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java
index 24abd5f..ff7d61f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java
@@ -17,20 +17,6 @@
 
 package org.apache.nifi.controller;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.dom.DOMSource;
-
 import org.apache.nifi.persistence.TemplateDeserializer;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
@@ -46,6 +32,19 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.w3c.dom.Element;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.dom.DOMSource;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class TemplateUtils {
 
     public static TemplateDTO parseDto(final Element templateElement) {
@@ -199,6 +198,8 @@ public class TemplateUtils {
                 }
             }
 
+            processorDTO.setExtensionMissing(null);
+            processorDTO.setMultipleVersionsAvailable(null);
             processorDTO.setValidationErrors(null);
             processorDTO.setInputRequirement(null);
             processorDTO.setDescription(null);
@@ -229,6 +230,7 @@ public class TemplateUtils {
         descriptor.setRequired(null);
         descriptor.setSensitive(null);
         descriptor.setSupportsEl(null);
+        descriptor.setIdentifiesControllerServiceBundle(null);
     }
 
     private static void scrubControllerServices(final 
Set<ControllerServiceDTO> controllerServices) {
@@ -246,6 +248,11 @@ public class TemplateUtils {
                 }
             }
 
+            serviceDTO.setControllerServiceApis(null);
+
+            serviceDTO.setExtensionMissing(null);
+            serviceDTO.setMultipleVersionsAvailable(null);
+
             serviceDTO.setCustomUiUrl(null);
             serviceDTO.setValidationErrors(null);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index d7ae309..aeb1d07 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -17,10 +17,13 @@
 package org.apache.nifi.controller.reporting;
 
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractConfiguredComponent;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.LoggableComponent;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
@@ -49,7 +52,7 @@ public abstract class AbstractReportingTaskNode extends 
AbstractConfiguredCompon
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractReportingTaskNode.class);
 
-    private final ReportingTask reportingTask;
+    private final AtomicReference<ReportingTaskDetails> reportingTaskRef;
     private final ProcessScheduler processScheduler;
     private final ControllerServiceLookup serviceLookup;
 
@@ -59,28 +62,26 @@ public abstract class AbstractReportingTaskNode extends 
AbstractConfiguredCompon
     private volatile String comment;
     private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
 
-    public AbstractReportingTaskNode(final ReportingTask reportingTask, final 
String id,
+    public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> 
reportingTask, final String id,
                                      final ControllerServiceProvider 
controllerServiceProvider, final ProcessScheduler processScheduler,
-                                     final ValidationContextFactory 
validationContextFactory, final VariableRegistry variableRegistry,
-                                     final ComponentLog logger) {
+                                     final ValidationContextFactory 
validationContextFactory, final VariableRegistry variableRegistry) {
 
         this(reportingTask, id, controllerServiceProvider, processScheduler, 
validationContextFactory,
-            reportingTask.getClass().getSimpleName(), 
reportingTask.getClass().getCanonicalName(),variableRegistry, logger);
+            reportingTask.getComponent().getClass().getSimpleName(), 
reportingTask.getComponent().getClass().getCanonicalName(),variableRegistry, 
false);
     }
 
 
-    public AbstractReportingTaskNode(final ReportingTask reportingTask, final 
String id,
-                                     final ControllerServiceProvider 
controllerServiceProvider, final ProcessScheduler processScheduler,
-                                     final ValidationContextFactory 
validationContextFactory,
+    public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> 
reportingTask, final String id, final ControllerServiceProvider 
controllerServiceProvider,
+                                     final ProcessScheduler processScheduler, 
final ValidationContextFactory validationContextFactory,
                                      final String componentType, final String 
componentCanonicalClass, final VariableRegistry variableRegistry,
-                                     final ComponentLog logger) {
+                                     final boolean isExtensionMissing) {
 
-        super(reportingTask, id, validationContextFactory, 
controllerServiceProvider, componentType, componentCanonicalClass, 
variableRegistry, logger);
-        this.reportingTask = reportingTask;
+        super(id, validationContextFactory, controllerServiceProvider, 
componentType, componentCanonicalClass, variableRegistry, isExtensionMissing);
+        this.reportingTaskRef = new AtomicReference<>(new 
ReportingTaskDetails(reportingTask));
         this.processScheduler = processScheduler;
         this.serviceLookup = controllerServiceProvider;
 
-        final Class<?> reportingClass = reportingTask.getClass();
+        final Class<?> reportingClass = 
reportingTask.getComponent().getClass();
 
         DefaultSchedule dsc = AnnotationUtils.findAnnotation(reportingClass, 
DefaultSchedule.class);
         if(dsc != null) {
@@ -99,6 +100,21 @@ public abstract class AbstractReportingTaskNode extends 
AbstractConfiguredCompon
     }
 
     @Override
+    public ConfigurableComponent getComponent() {
+        return reportingTaskRef.get().getReportingTask();
+    }
+
+    @Override
+    public BundleCoordinate getBundleCoordinate() {
+        return reportingTaskRef.get().getBundleCoordinate();
+    }
+
+    @Override
+    public ComponentLog getLogger() {
+        return reportingTaskRef.get().getComponentLog();
+    }
+
+    @Override
     public void setSchedulingStrategy(final SchedulingStrategy 
schedulingStrategy) {
         this.schedulingStrategy.set(schedulingStrategy);
     }
@@ -125,7 +141,15 @@ public abstract class AbstractReportingTaskNode extends 
AbstractConfiguredCompon
 
     @Override
     public ReportingTask getReportingTask() {
-        return reportingTask;
+        return reportingTaskRef.get().getReportingTask();
+    }
+
+    @Override
+    public void setReportingTask(final LoggableComponent<ReportingTask> 
reportingTask) {
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Reporting Task 
configuration while Reporting Task is running");
+        }
+        this.reportingTaskRef.set(new ReportingTaskDetails(reportingTask));
     }
 
     @Override
@@ -177,50 +201,50 @@ public abstract class AbstractReportingTaskNode extends 
AbstractConfiguredCompon
     @Override
     public void verifyCanDelete() {
         if (isRunning()) {
-            throw new IllegalStateException("Cannot delete " + 
reportingTask.getIdentifier() + " because it is currently running");
+            throw new IllegalStateException("Cannot delete " + 
getReportingTask().getIdentifier() + " because it is currently running");
         }
     }
 
     @Override
     public void verifyCanDisable() {
         if (isRunning()) {
-            throw new IllegalStateException("Cannot disable " + 
reportingTask.getIdentifier() + " because it is currently running");
+            throw new IllegalStateException("Cannot disable " + 
getReportingTask().getIdentifier() + " because it is currently running");
         }
 
         if (isDisabled()) {
-            throw new IllegalStateException("Cannot disable " + 
reportingTask.getIdentifier() + " because it is already disabled");
+            throw new IllegalStateException("Cannot disable " + 
getReportingTask().getIdentifier() + " because it is already disabled");
         }
     }
 
     @Override
     public void verifyCanEnable() {
         if (!isDisabled()) {
-            throw new IllegalStateException("Cannot enable " + 
reportingTask.getIdentifier() + " because it is not disabled");
+            throw new IllegalStateException("Cannot enable " + 
getReportingTask().getIdentifier() + " because it is not disabled");
         }
     }
 
     @Override
     public void verifyCanStart() {
         if (isDisabled()) {
-            throw new IllegalStateException("Cannot start " + 
reportingTask.getIdentifier() + " because it is currently disabled");
+            throw new IllegalStateException("Cannot start " + 
getReportingTask().getIdentifier() + " because it is currently disabled");
         }
 
         if (isRunning()) {
-            throw new IllegalStateException("Cannot start " + 
reportingTask.getIdentifier() + " because it is already running");
+            throw new IllegalStateException("Cannot start " + 
getReportingTask().getIdentifier() + " because it is already running");
         }
     }
 
     @Override
     public void verifyCanStop() {
         if (!isRunning()) {
-            throw new IllegalStateException("Cannot stop " + 
reportingTask.getIdentifier() + " because it is not running");
+            throw new IllegalStateException("Cannot stop " + 
getReportingTask().getIdentifier() + " because it is not running");
         }
     }
 
     @Override
     public void verifyCanUpdate() {
         if (isRunning()) {
-            throw new IllegalStateException("Cannot update " + 
reportingTask.getIdentifier() + " because it is currently running");
+            throw new IllegalStateException("Cannot update " + 
getReportingTask().getIdentifier() + " because it is currently running");
         }
     }
 
@@ -275,4 +299,5 @@ public abstract class AbstractReportingTaskNode extends 
AbstractConfiguredCompon
         }
         return results != null ? results : Collections.emptySet();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskDetails.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskDetails.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskDetails.java
new file mode 100644
index 0000000..3561c07
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskDetails.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.reporting;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.ReportingTask;
+
+/**
+ * Holder for StandardReportingTaskNode to atomically swap out the component.
+ */
+class ReportingTaskDetails {
+
+    private final ReportingTask reportingTask;
+    private final ComponentLog componentLog;
+    private final BundleCoordinate bundleCoordinate;
+
+    public ReportingTaskDetails(final LoggableComponent<ReportingTask> 
reportingTask) {
+        this.reportingTask = reportingTask.getComponent();
+        this.componentLog = reportingTask.getLogger();
+        this.bundleCoordinate = reportingTask.getBundleCoordinate();
+    }
+
+    public ReportingTask getReportingTask() {
+        return reportingTask;
+    }
+
+    public ComponentLog getComponentLog() {
+        return componentLog;
+    }
+
+    public BundleCoordinate getBundleCoordinate() {
+        return bundleCoordinate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index adb2240..edf1b67 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -22,10 +22,10 @@ import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.resource.ResourceType;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.LoggableComponent;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingTask;
@@ -34,17 +34,18 @@ public class StandardReportingTaskNode extends 
AbstractReportingTaskNode impleme
 
     private final FlowController flowController;
 
-    public StandardReportingTaskNode(final ReportingTask reportingTask, final 
String id, final FlowController controller,
+    public StandardReportingTaskNode(final LoggableComponent<ReportingTask> 
reportingTask, final String id, final FlowController controller,
                                      final ProcessScheduler processScheduler, 
final ValidationContextFactory validationContextFactory,
-                                     final VariableRegistry variableRegistry, 
final ComponentLog logger) {
-        super(reportingTask, id, controller, processScheduler, 
validationContextFactory, variableRegistry, logger);
+                                     final VariableRegistry variableRegistry) {
+        super(reportingTask, id, controller, processScheduler, 
validationContextFactory, variableRegistry);
         this.flowController = controller;
     }
 
-    public StandardReportingTaskNode(final ReportingTask reportingTask, final 
String id, final FlowController controller,
-        final ProcessScheduler processScheduler, final 
ValidationContextFactory validationContextFactory,
-        final String componentType, final String canonicalClassName, final 
VariableRegistry variableRegistry, final ComponentLog logger) {
-        super(reportingTask, id, controller, processScheduler, 
validationContextFactory, componentType, canonicalClassName,variableRegistry, 
logger);
+    public StandardReportingTaskNode(final LoggableComponent<ReportingTask> 
reportingTask, final String id, final FlowController controller,
+                                     final ProcessScheduler processScheduler, 
final ValidationContextFactory validationContextFactory,
+                                     final String componentType, final String 
canonicalClassName, final VariableRegistry variableRegistry,
+                                     final boolean isExtensionMissing) {
+        super(reportingTask, id, controller, processScheduler, 
validationContextFactory, componentType, canonicalClassName,variableRegistry, 
isExtensionMissing);
         this.flowController = controller;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index f8d38bc..731d914 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -16,24 +16,16 @@
  */
 package org.apache.nifi.controller.serialization;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
-import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.DomUtils;
+import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
@@ -50,8 +42,29 @@ import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 public class FlowFromDOMFactory {
 
+    public static BundleDTO getBundle(final Element bundleElement) {
+        if (bundleElement == null) {
+            return null;
+        }
+
+        final Element groupElement = DomUtils.getChild(bundleElement, "group");
+        final Element artifactElement = DomUtils.getChild(bundleElement, 
"artifact");
+        final Element versionElement = DomUtils.getChild(bundleElement, 
"version");
+
+        return new BundleDTO(groupElement.getTextContent(), 
artifactElement.getTextContent(), versionElement.getTextContent());
+    }
+
     public static PositionDTO getPosition(final Element positionElement) {
         if (positionElement == null) {
             throw new IllegalArgumentException("Invalid Flow: Found no 
'position' element");
@@ -89,6 +102,7 @@ public class FlowFromDOMFactory {
         dto.setName(getString(element, "name"));
         dto.setComments(getString(element, "comment"));
         dto.setType(getString(element, "class"));
+        dto.setBundle(getBundle(DomUtils.getChild(element, "bundle")));
 
         final boolean enabled = getBoolean(element, "enabled");
         dto.setState(enabled ? ControllerServiceState.ENABLED.name() : 
ControllerServiceState.DISABLED.name());
@@ -106,6 +120,7 @@ public class FlowFromDOMFactory {
         dto.setName(getString(element, "name"));
         dto.setComments(getString(element, "comment"));
         dto.setType(getString(element, "class"));
+        dto.setBundle(getBundle(DomUtils.getChild(element, "bundle")));
         dto.setSchedulingPeriod(getString(element, "schedulingPeriod"));
         dto.setState(getString(element, "scheduledState"));
         dto.setSchedulingStrategy(getString(element, "schedulingStrategy"));
@@ -353,6 +368,7 @@ public class FlowFromDOMFactory {
         dto.setId(getString(element, "id"));
         dto.setName(getString(element, "name"));
         dto.setType(getString(element, "class"));
+        dto.setBundle(getBundle(DomUtils.getChild(element, "bundle")));
         dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
         dto.setStyle(getStyle(DomUtils.getChild(element, "styles")));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java
index 86614af..bd0a7ad 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.serialization;
 
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.MissingBundleException;
 import org.apache.nifi.controller.UninheritableFlowException;
 import org.apache.nifi.encrypt.StringEncryptor;
 
@@ -38,8 +39,9 @@ public interface FlowSynchronizer {
      * @throws FlowSerializationException if proposed flow is not a valid flow 
configuration file
      * @throws UninheritableFlowException if the proposed flow cannot be 
loaded by the controller because in doing so would risk orphaning flow files
      * @throws FlowSynchronizationException if updates to the controller 
failed. If this exception is thrown, then the controller should be considered 
unsafe to be used
+     * @throws MissingBundleException if the proposed flow cannot be loaded by 
the controller because it contains a bundle that is not available to the 
controller
      */
     void sync(FlowController controller, DataFlow dataFlow, StringEncryptor 
encryptor)
-            throws FlowSerializationException, UninheritableFlowException, 
FlowSynchronizationException;
+            throws FlowSerializationException, UninheritableFlowException, 
FlowSynchronizationException, MissingBundleException;
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index f6e3d2b..aa7022b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -16,24 +16,7 @@
  */
 package org.apache.nifi.controller.serialization;
 
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.TransformerFactoryConfigurationError;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
@@ -62,6 +45,23 @@ import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.TransformerFactoryConfigurationError;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Serializes a Flow Controller as XML to an output stream.
  *
@@ -203,6 +203,28 @@ public class StandardFlowSerializer implements 
FlowSerializer {
         }
     }
 
+    private static void addBundle(final Element parentElement, final 
BundleCoordinate coordinate) {
+        // group
+        final Element groupElement = 
parentElement.getOwnerDocument().createElement("group");
+        groupElement.setTextContent(coordinate.getGroup());
+
+        // artifact
+        final Element artifactElement = 
parentElement.getOwnerDocument().createElement("artifact");
+        artifactElement.setTextContent(coordinate.getId());
+
+        // version
+        final Element versionElement = 
parentElement.getOwnerDocument().createElement("version");
+        versionElement.setTextContent(coordinate.getVersion());
+
+        // bundle
+        final Element bundleElement = 
parentElement.getOwnerDocument().createElement("bundle");
+        bundleElement.appendChild(groupElement);
+        bundleElement.appendChild(artifactElement);
+        bundleElement.appendChild(versionElement);
+
+        parentElement.appendChild(bundleElement);
+    }
+
     private void addStyle(final Element parentElement, final Map<String, 
String> style) {
         final Element element = 
parentElement.getOwnerDocument().createElement("styles");
 
@@ -340,6 +362,9 @@ public class StandardFlowSerializer implements 
FlowSerializer {
 
         addTextElement(element, "comment", processor.getComments());
         addTextElement(element, "class", processor.getCanonicalClassName());
+
+        addBundle(element, processor.getBundleCoordinate());
+
         addTextElement(element, "maxConcurrentTasks", 
processor.getMaxConcurrentTasks());
         addTextElement(element, "schedulingPeriod", 
processor.getSchedulingPeriod());
         addTextElement(element, "penalizationPeriod", 
processor.getPenalizationPeriod());
@@ -451,6 +476,8 @@ public class StandardFlowSerializer implements 
FlowSerializer {
         addTextElement(serviceElement, "comment", serviceNode.getComments());
         addTextElement(serviceElement, "class", 
serviceNode.getCanonicalClassName());
 
+        addBundle(serviceElement, serviceNode.getBundleCoordinate());
+
         final ControllerServiceState state = serviceNode.getState();
         final boolean enabled = (state == ControllerServiceState.ENABLED || 
state == ControllerServiceState.ENABLING);
         addTextElement(serviceElement, "enabled", String.valueOf(enabled));
@@ -466,6 +493,9 @@ public class StandardFlowSerializer implements 
FlowSerializer {
         addTextElement(taskElement, "name", taskNode.getName());
         addTextElement(taskElement, "comment", taskNode.getComments());
         addTextElement(taskElement, "class", taskNode.getCanonicalClassName());
+
+        addBundle(taskElement, taskNode.getBundleCoordinate());
+
         addTextElement(taskElement, "schedulingPeriod", 
taskNode.getSchedulingPeriod());
         addTextElement(taskElement, "scheduledState", 
taskNode.getScheduledState().name());
         addTextElement(taskElement, "schedulingStrategy", 
taskNode.getSchedulingStrategy().name());

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceDetails.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceDetails.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceDetails.java
new file mode 100644
index 0000000..3db77c9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceDetails.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.logging.ComponentLog;
+
+/**
+ * Holder for StandardControllerServiceNode to atomically swap out the 
component.
+ */
+public class ControllerServiceDetails {
+
+    private final ControllerService proxiedControllerService;
+    private final ControllerService implementation;
+    private final ComponentLog componentLog;
+    private final BundleCoordinate bundleCoordinate;
+    private final ControllerServiceInvocationHandler invocationHandler;
+
+    public ControllerServiceDetails(final LoggableComponent<ControllerService> 
implementation,
+                                    final LoggableComponent<ControllerService> 
proxiedControllerService,
+                                    final ControllerServiceInvocationHandler 
invocationHandler) {
+        this.proxiedControllerService = 
proxiedControllerService.getComponent();
+        this.implementation = implementation.getComponent();
+        this.componentLog = implementation.getLogger();
+        this.bundleCoordinate = implementation.getBundleCoordinate();
+        this.invocationHandler = invocationHandler;
+    }
+
+    public ControllerService getProxiedControllerService() {
+        return proxiedControllerService;
+    }
+
+    public ControllerService getImplementation() {
+        return implementation;
+    }
+
+    public ComponentLog getComponentLog() {
+        return componentLog;
+    }
+
+    public BundleCoordinate getBundleCoordinate() {
+        return bundleCoordinate;
+    }
+
+    public ControllerServiceInvocationHandler getInvocationHandler() {
+        return invocationHandler;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 1596c63..520d3fb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -16,30 +16,16 @@
  */
 package org.apache.nifi.controller.service;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.util.DomUtils;
+import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +34,22 @@ import org.w3c.dom.Element;
 import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 public class ControllerServiceLoader {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ControllerServiceLoader.class);
@@ -161,7 +163,7 @@ public class ControllerServiceLoader {
         // create a new id for the clone seeded from the original id so that 
it is consistent in a cluster
         final UUID id = 
UUID.nameUUIDFromBytes(controllerService.getIdentifier().getBytes(StandardCharsets.UTF_8));
 
-        final ControllerServiceNode clone = 
provider.createControllerService(controllerService.getCanonicalClassName(), 
id.toString(), false);
+        final ControllerServiceNode clone = 
provider.createControllerService(controllerService.getCanonicalClassName(), 
id.toString(), controllerService.getBundleCoordinate(), false);
         clone.setName(controllerService.getName());
         clone.setComments(controllerService.getComments());
 
@@ -179,7 +181,19 @@ public class ControllerServiceLoader {
     private static ControllerServiceNode createControllerService(final 
ControllerServiceProvider provider, final Element controllerServiceElement, 
final StringEncryptor encryptor) {
         final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
 
-        final ControllerServiceNode node = 
provider.createControllerService(dto.getType(), dto.getId(), false);
+        BundleCoordinate coordinate;
+        try {
+            coordinate = BundleUtils.getCompatibleBundle(dto.getType(), 
dto.getBundle());
+        } catch (final IllegalStateException e) {
+            final BundleDTO bundleDTO = dto.getBundle();
+            if (bundleDTO == null) {
+                coordinate = BundleCoordinate.UNKNOWN_COORDINATE;
+            } else {
+                coordinate = new BundleCoordinate(bundleDTO.getGroup(), 
bundleDTO.getArtifact(), bundleDTO.getVersion());
+            }
+        }
+
+        final ControllerServiceNode node = 
provider.createControllerService(dto.getType(), dto.getId(), coordinate, false);
         node.setName(dto.getName());
         node.setComments(dto.getComments());
         return node;

Reply via email to