http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
index 965976a..db68824 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java
@@ -58,6 +58,11 @@ public enum DisconnectionCode {
     MISMATCHED_FLOWS("Node's Flow did not Match Cluster Flow"),
 
     /**
+     * The node was missing a bundle used the cluster flow.
+     */
+    MISSING_BUNDLE("Node was missing bundle used by Cluster Flow"),
+
+    /**
      * Cannot communicate with the node
      */
     UNABLE_TO_COMMUNICATE("Unable to Communicate with Node"),

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
index 6c5c1ce..7822db7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
@@ -18,6 +18,9 @@ package org.apache.nifi.cluster.protocol;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 
@@ -34,6 +37,7 @@ public class StandardDataFlow implements Serializable, 
DataFlow {
     private final byte[] flow;
     private final byte[] snippetBytes;
     private final byte[] authorizerFingerprint;
+    private final Set<String> missingComponentIds;
 
     /**
      * Constructs an instance.
@@ -41,22 +45,27 @@ public class StandardDataFlow implements Serializable, 
DataFlow {
      * @param flow a valid flow as bytes, which cannot be null
      * @param snippetBytes an XML representation of snippets.  May be null.
      * @param authorizerFingerprint the bytes of the Authorizer's fingerprint. 
May be null when using an external Authorizer.
+     * @param missingComponentIds the ids of components that were created as 
missing ghost components
      *
      * @throws NullPointerException if flow is null
      */
-    public StandardDataFlow(final byte[] flow, final byte[] snippetBytes, 
final byte[] authorizerFingerprint) {
+    public StandardDataFlow(final byte[] flow, final byte[] snippetBytes, 
final byte[] authorizerFingerprint, final Set<String> missingComponentIds) {
         if(flow == null){
             throw new NullPointerException("Flow cannot be null");
         }
         this.flow = flow;
         this.snippetBytes = snippetBytes;
         this.authorizerFingerprint = authorizerFingerprint;
+        this.missingComponentIds = 
Collections.unmodifiableSet(missingComponentIds == null
+                ? new HashSet<>() : new HashSet<>(missingComponentIds));
     }
 
     public StandardDataFlow(final DataFlow toCopy) {
         this.flow = copy(toCopy.getFlow());
         this.snippetBytes = copy(toCopy.getSnippets());
         this.authorizerFingerprint = copy(toCopy.getAuthorizerFingerprint());
+        this.missingComponentIds = 
Collections.unmodifiableSet(toCopy.getMissingComponents() == null
+                ? new HashSet<>() : new 
HashSet<>(toCopy.getMissingComponents()));
     }
 
     private static byte[] copy(final byte[] bytes) {
@@ -79,4 +88,9 @@ public class StandardDataFlow implements Serializable, 
DataFlow {
         return authorizerFingerprint;
     }
 
+    @Override
+    public Set<String> getMissingComponents() {
+        return missingComponentIds;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
index bfdfe3e..738fa86 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.cluster.protocol.jaxb.message;
 
+import java.util.Set;
+
 /**
  */
 public class AdaptedDataFlow {
@@ -23,6 +25,7 @@ public class AdaptedDataFlow {
     private byte[] flow;
     private byte[] snippets;
     private byte[] authorizerFingerprint;
+    private Set<String> missingComponents;
 
     public byte[] getFlow() {
         return flow;
@@ -48,4 +51,12 @@ public class AdaptedDataFlow {
         this.authorizerFingerprint = authorizerFingerprint;
     }
 
+    public Set<String> getMissingComponents() {
+        return missingComponents;
+    }
+
+    public void setMissingComponents(Set<String> missingComponents) {
+        this.missingComponents = missingComponents;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
index 054d56e..d31e8e5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
@@ -33,6 +33,7 @@ public class DataFlowAdapter extends 
XmlAdapter<AdaptedDataFlow, DataFlow> {
             aDf.setFlow(df.getFlow());
             aDf.setSnippets(df.getSnippets());
             aDf.setAuthorizerFingerprint(df.getAuthorizerFingerprint());
+            aDf.setMissingComponents(df.getMissingComponents());
         }
 
         return aDf;
@@ -40,7 +41,7 @@ public class DataFlowAdapter extends 
XmlAdapter<AdaptedDataFlow, DataFlow> {
 
     @Override
     public DataFlow unmarshal(final AdaptedDataFlow aDf) {
-        final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), 
aDf.getSnippets(), aDf.getAuthorizerFingerprint());
+        final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), 
aDf.getSnippets(), aDf.getAuthorizerFingerprint(), aDf.getMissingComponents());
         return dataFlow;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
index 8c2cca6..7ced87e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
@@ -24,6 +24,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.IntStream;
@@ -57,7 +58,7 @@ public class TestJaxbProtocolUtils {
 
         final ConnectionResponseMessage msg = new ConnectionResponseMessage();
         final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 
8000, "localhost", 8001, "localhost", 8002, 8003, true);
-        final DataFlow dataFlow = new StandardDataFlow(new byte[0], new 
byte[0], new byte[0]);
+        final DataFlow dataFlow = new StandardDataFlow(new byte[0], new 
byte[0], new byte[0], new HashSet<>());
         final List<NodeConnectionStatus> nodeStatuses = 
Collections.singletonList(new NodeConnectionStatus(nodeId, 
DisconnectionCode.NOT_YET_CONNECTED));
         final List<ComponentRevision> componentRevisions = 
Collections.singletonList(ComponentRevision.fromRevision(new Revision(8L, 
"client-1", "component-1")));
         msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, 
"instance-1", nodeStatuses, componentRevisions));

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
index 68719fe..bbaeb26 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -26,6 +26,7 @@ import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurati
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceTypesEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
@@ -44,10 +45,12 @@ import 
org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpoint
 import 
org.apache.nifi.cluster.coordination.http.endpoints.OutputPortsEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.PrioritizerTypesEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ProcessorTypesEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger;
@@ -55,6 +58,7 @@ import 
org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEnd
 import 
org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskTypesEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
@@ -124,6 +128,10 @@ public class StandardHttpResponseMapper implements 
HttpResponseMapper {
         endpointMergers.add(new SystemDiagnosticsEndpointMerger());
         endpointMergers.add(new CountersEndpointMerger());
         endpointMergers.add(new FlowMerger());
+        endpointMergers.add(new ProcessorTypesEndpointMerger());
+        endpointMergers.add(new ControllerServiceTypesEndpointMerger());
+        endpointMergers.add(new ReportingTaskTypesEndpointMerger());
+        endpointMergers.add(new PrioritizerTypesEndpointMerger());
         endpointMergers.add(new ControllerConfigurationEndpointMerger());
         endpointMergers.add(new CurrentUserEndpointMerger());
         endpointMergers.add(new FlowConfigurationEndpointMerger());

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceTypesEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceTypesEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceTypesEndpointMerger.java
new file mode 100644
index 0000000..b0415a5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceTypesEndpointMerger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.manager.DocumentedTypesMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+
+public class ControllerServiceTypesEndpointMerger extends 
AbstractNodeStatusEndpoint<ControllerServiceTypesEntity, 
Set<DocumentedTypeDTO>> {
+    public static final String CONTROLLER_SERVICE_TYPES_URI_PATTERN = 
"/nifi-api/flow/controller-service-types";
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
CONTROLLER_SERVICE_TYPES_URI_PATTERN.equals(uri.getPath());
+    }
+
+    @Override
+    protected Class<ControllerServiceTypesEntity> getEntityClass() {
+        return ControllerServiceTypesEntity.class;
+    }
+
+    @Override
+    protected Set<DocumentedTypeDTO> getDto(ControllerServiceTypesEntity 
entity) {
+        return entity.getControllerServiceTypes();
+    }
+
+    @Override
+    protected void mergeResponses(Set<DocumentedTypeDTO> clientDto, 
Map<NodeIdentifier, Set<DocumentedTypeDTO>> dtoMap, NodeIdentifier 
selectedNodeId) {
+        DocumentedTypesMerger.mergeDocumentedTypes(clientDto, dtoMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PrioritizerTypesEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PrioritizerTypesEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PrioritizerTypesEndpointMerger.java
new file mode 100644
index 0000000..9a9e7a1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PrioritizerTypesEndpointMerger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.manager.DocumentedTypesMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+import org.apache.nifi.web.api.entity.PrioritizerTypesEntity;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+
+public class PrioritizerTypesEndpointMerger extends 
AbstractNodeStatusEndpoint<PrioritizerTypesEntity, Set<DocumentedTypeDTO>> {
+    public static final String PRIORITIZER_TYPES_URI_PATTERN = 
"/nifi-api/flow/prioritizers";
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
PRIORITIZER_TYPES_URI_PATTERN.equals(uri.getPath());
+    }
+
+    @Override
+    protected Class<PrioritizerTypesEntity> getEntityClass() {
+        return PrioritizerTypesEntity.class;
+    }
+
+    @Override
+    protected Set<DocumentedTypeDTO> getDto(PrioritizerTypesEntity entity) {
+        return entity.getPrioritizerTypes();
+    }
+
+    @Override
+    protected void mergeResponses(Set<DocumentedTypeDTO> clientDto, 
Map<NodeIdentifier, Set<DocumentedTypeDTO>> dtoMap, NodeIdentifier 
selectedNodeId) {
+        DocumentedTypesMerger.mergeDocumentedTypes(clientDto, dtoMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorTypesEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorTypesEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorTypesEndpointMerger.java
new file mode 100644
index 0000000..b95f008
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorTypesEndpointMerger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.manager.DocumentedTypesMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+import org.apache.nifi.web.api.entity.ProcessorTypesEntity;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+
+public class ProcessorTypesEndpointMerger extends 
AbstractNodeStatusEndpoint<ProcessorTypesEntity, Set<DocumentedTypeDTO>> {
+    public static final String PROCESSOR_TYPES_URI_PATTERN = 
"/nifi-api/flow/processor-types";
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
PROCESSOR_TYPES_URI_PATTERN.equals(uri.getPath());
+    }
+
+    @Override
+    protected Class<ProcessorTypesEntity> getEntityClass() {
+        return ProcessorTypesEntity.class;
+    }
+
+    @Override
+    protected Set<DocumentedTypeDTO> getDto(ProcessorTypesEntity entity) {
+        return entity.getProcessorTypes();
+    }
+
+    @Override
+    protected void mergeResponses(Set<DocumentedTypeDTO> clientDto, 
Map<NodeIdentifier, Set<DocumentedTypeDTO>> dtoMap, NodeIdentifier 
selectedNodeId) {
+        DocumentedTypesMerger.mergeDocumentedTypes(clientDto, dtoMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskTypesEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskTypesEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskTypesEndpointMerger.java
new file mode 100644
index 0000000..4abb6b7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskTypesEndpointMerger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.manager.DocumentedTypesMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+
+public class ReportingTaskTypesEndpointMerger extends 
AbstractNodeStatusEndpoint<ReportingTaskTypesEntity, Set<DocumentedTypeDTO>> {
+    public static final String REPORTING_TASK_TYPES_URI_PATTERN = 
"/nifi-api/flow/reporting-task-types";
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
REPORTING_TASK_TYPES_URI_PATTERN.equals(uri.getPath());
+    }
+
+    @Override
+    protected Class<ReportingTaskTypesEntity> getEntityClass() {
+        return ReportingTaskTypesEntity.class;
+    }
+
+    @Override
+    protected Set<DocumentedTypeDTO> getDto(ReportingTaskTypesEntity entity) {
+        return entity.getReportingTaskTypes();
+    }
+
+    @Override
+    protected void mergeResponses(Set<DocumentedTypeDTO> clientDto, 
Map<NodeIdentifier, Set<DocumentedTypeDTO>> dtoMap, NodeIdentifier 
selectedNodeId) {
+        DocumentedTypesMerger.mergeDocumentedTypes(clientDto, dtoMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index c27f186..754d370 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -703,7 +703,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                         }
 
                         if (includeDataFlow) {
-                            request.setDataFlow(new 
StandardDataFlow(flowService.createDataFlow()));
+                            request.setDataFlow(new 
StandardDataFlow(flowService.createDataFlowFromController()));
                         }
 
                         
request.setNodeConnectionStatuses(getConnectionStatuses());
@@ -889,7 +889,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         DataFlow dataFlow = null;
         if (flowService != null) {
             try {
-                dataFlow = flowService.createDataFlow();
+                dataFlow = flowService.createDataFlowFromController();
             } catch (final IOException ioe) {
                 logger.error("Unable to obtain current dataflow from 
FlowService in order to provide the flow to "
                     + resolvedNodeIdentifier + ". Will tell node to try again 
later", ioe);

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/DocumentedTypesMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/DocumentedTypesMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/DocumentedTypesMerger.java
new file mode 100644
index 0000000..91ed849
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/DocumentedTypesMerger.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+
+import java.util.Map;
+import java.util.Set;
+
+public final class DocumentedTypesMerger {
+
+    private DocumentedTypesMerger() {}
+
+    /**
+     * Merges the documented types from all nodes in the cluster.
+     *
+     * @param clientDocumentedTypes the selected documented types
+     * @param dtoMap                the documented types from all nodes
+     */
+    public static void mergeDocumentedTypes(final Set<DocumentedTypeDTO> 
clientDocumentedTypes, final Map<NodeIdentifier, Set<DocumentedTypeDTO>> 
dtoMap) {
+        dtoMap.forEach((nodeIdentifier, nodeDocumentedTypes) -> {
+            clientDocumentedTypes.retainAll(nodeDocumentedTypes);
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java
index 2553828..5c419e9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java
@@ -80,10 +80,19 @@ public class ProcessorEntityMerger implements 
ComponentEntityMerger<ProcessorEnt
             // merge the validation errors and aggregate the property 
descriptors, if authorized
             if (nodeProcessor != null) {
                 final NodeIdentifier nodeId = nodeEntry.getKey();
+
+                // merge the validation errors
                 ErrorMerger.mergeErrors(validationErrorMap, nodeId, 
nodeProcessor.getValidationErrors());
+
+                // aggregate the property descriptors
                 
nodeProcessor.getConfig().getDescriptors().values().stream().forEach(propertyDescriptor
 -> {
                     
propertyDescriptorMap.computeIfAbsent(propertyDescriptor.getName(), 
nodeIdToPropertyDescriptor -> new HashMap<>()).put(nodeId, propertyDescriptor);
                 });
+
+                // if any node does not support multiple versions (null or 
false), make it unavailable
+                if (clientDto.getMultipleVersionsAvailable() == null || 
!Boolean.TRUE.equals(nodeProcessor.getMultipleVersionsAvailable())) {
+                    clientDto.setMultipleVersionsAvailable(Boolean.FALSE);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
index b7f9e82..6447e27 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -139,6 +140,6 @@ public class TestPopularVoteFlowElection {
     }
 
     private DataFlow createDataFlow(final byte[] flow) {
-        return new StandardDataFlow(flow, new byte[0], new byte[0]);
+        return new StandardDataFlow(flow, new byte[0], new byte[0], new 
HashSet<>());
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index e2577a9..1378d3b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -88,7 +89,7 @@ public class TestNodeClusterCoordinator {
         };
 
         final FlowService flowService = Mockito.mock(FlowService.class);
-        final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], 
new byte[50], new byte[50]);
+        final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], 
new byte[50], new byte[50], new HashSet<>());
         Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
         coordinator.setFlowService(flowService);
     }
@@ -143,7 +144,7 @@ public class TestNodeClusterCoordinator {
         };
 
         final NodeIdentifier requestedNodeId = createNodeId(6);
-        final ConnectionRequest request = new 
ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new 
byte[0], new byte[0]));
+        final ConnectionRequest request = new 
ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new 
byte[0], new byte[0], new HashSet<>()));
         final ConnectionRequestMessage requestMsg = new 
ConnectionRequestMessage();
         requestMsg.setConnectionRequest(request);
 
@@ -184,8 +185,8 @@ public class TestNodeClusterCoordinator {
         };
 
         final FlowService flowService = Mockito.mock(FlowService.class);
-        final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], 
new byte[50], new byte[50]);
-        Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
+        final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], 
new byte[50], new byte[50], new HashSet<>());
+        
Mockito.when(flowService.createDataFlowFromController()).thenReturn(dataFlow);
         coordinator.setFlowService(flowService);
         coordinator.setConnected(true);
 
@@ -408,7 +409,7 @@ public class TestNodeClusterCoordinator {
         final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 
8000, "localhost", 9000, "localhost", 10000, 11000, false);
         final NodeIdentifier conflictingId = new NodeIdentifier("1234", 
"localhost", 8001, "localhost", 9000, "localhost", 10000, 11000, false);
 
-        final ConnectionRequest connectionRequest = new ConnectionRequest(id1, 
new StandardDataFlow(new byte[0], new byte[0], new byte[0]));
+        final ConnectionRequest connectionRequest = new ConnectionRequest(id1, 
new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
         final ConnectionRequestMessage crm = new ConnectionRequestMessage();
         crm.setConnectionRequest(connectionRequest);
 
@@ -419,7 +420,7 @@ public class TestNodeClusterCoordinator {
         final NodeIdentifier resolvedNodeId = 
responseMessage.getConnectionResponse().getNodeIdentifier();
         assertEquals(id1, resolvedNodeId);
 
-        final ConnectionRequest conRequest2 = new 
ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], 
new byte[0]));
+        final ConnectionRequest conRequest2 = new 
ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], 
new byte[0], new HashSet<>()));
         final ConnectionRequestMessage crm2 = new ConnectionRequestMessage();
         crm2.setConnectionRequest(conRequest2);
 
@@ -442,7 +443,7 @@ public class TestNodeClusterCoordinator {
     }
 
     private ProtocolMessage requestConnection(final NodeIdentifier 
requestedNodeId, final NodeClusterCoordinator coordinator) {
-        final ConnectionRequest request = new 
ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new 
byte[0], new byte[0]));
+        final ConnectionRequest request = new 
ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new 
byte[0], new byte[0], new HashSet<>()));
         final ConnectionRequestMessage requestMsg = new 
ConnectionRequestMessage();
         requestMsg.setConnectionRequest(request);
         return coordinator.handle(requestMsg);

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java
index fb80ab2..659ec78 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java
@@ -112,4 +112,5 @@ public class NopStateProvider implements StateProvider {
     public Scope[] getSupportedScopes() {
         return new Scope[] {Scope.LOCAL};
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
index 44b2a4e..d824231 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 # Core Properties #
-nifi.version=nifi-test 3.0.0
 nifi.flow.configuration.file=./target/flow.xml.gz
 nifi.flow.configuration.archive.dir=./target/archive/
 nifi.flowcontroller.autoResumeState=true

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
index 7c0c5ac..6e2b9fe 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.cluster.protocol;
 
+import java.util.Set;
+
 public interface DataFlow {
 
     /**
@@ -34,4 +36,9 @@ public interface DataFlow {
      */
     public byte[] getAuthorizerFingerprint();
 
+    /**
+     * @return the component ids of components that were created as a missing 
ghost component
+     */
+    public Set<String> getMissingComponents();
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index 1053463..345ce64 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -18,13 +18,15 @@ package org.apache.nifi.controller;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.InstanceClassLoader;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.registry.VariableRegistry;
@@ -43,6 +45,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -50,7 +53,6 @@ import java.util.concurrent.locks.ReentrantLock;
 public abstract class AbstractConfiguredComponent implements 
ConfigurableComponent, ConfiguredComponent {
 
     private final String id;
-    private final ConfigurableComponent component;
     private final ValidationContextFactory validationContextFactory;
     private final ControllerServiceProvider serviceProvider;
     private final AtomicReference<String> name;
@@ -58,25 +60,24 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
     private final String componentType;
     private final String componentCanonicalClass;
     private final VariableRegistry variableRegistry;
-    private final ComponentLog logger;
 
+    private final AtomicBoolean isExtensionMissing;
 
     private final Lock lock = new ReentrantLock();
     private final ConcurrentMap<PropertyDescriptor, String> properties = new 
ConcurrentHashMap<>();
 
-    public AbstractConfiguredComponent(final ConfigurableComponent component, 
final String id,
+    public AbstractConfiguredComponent(final String id,
                                        final ValidationContextFactory 
validationContextFactory, final ControllerServiceProvider serviceProvider,
                                        final String componentType, final 
String componentCanonicalClass, final VariableRegistry variableRegistry,
-                                       final ComponentLog logger) {
+                                       final boolean isExtensionMissing) {
         this.id = id;
-        this.component = component;
         this.validationContextFactory = validationContextFactory;
         this.serviceProvider = serviceProvider;
-        this.name = new 
AtomicReference<>(component.getClass().getSimpleName());
+        this.name = new AtomicReference<>(componentType);
         this.componentType = componentType;
         this.componentCanonicalClass = componentCanonicalClass;
         this.variableRegistry = variableRegistry;
-        this.logger = logger;
+        this.isExtensionMissing = new AtomicBoolean(isExtensionMissing);
     }
 
     @Override
@@ -85,6 +86,16 @@ public abstract class AbstractConfiguredComponent implements 
ConfigurableCompone
     }
 
     @Override
+    public void setExtensionMissing(boolean extensionMissing) {
+        this.isExtensionMissing.set(extensionMissing);
+    }
+
+    @Override
+    public boolean isExtensionMissing() {
+        return isExtensionMissing.get();
+    }
+
+    @Override
     public String getName() {
         return name.get();
     }
@@ -114,11 +125,11 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
         try {
             verifyModifiable();
 
-            try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(component.getClass(), id)) {
+            try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getComponent().getClass(), id)) {
                 boolean classpathChanged = false;
                 for (final Map.Entry<String, String> entry : 
properties.entrySet()) {
                     // determine if any of the property changes require 
resetting the InstanceClassLoader
-                    final PropertyDescriptor descriptor = 
component.getPropertyDescriptor(entry.getKey());
+                    final PropertyDescriptor descriptor = 
getComponent().getPropertyDescriptor(entry.getKey());
                     if (descriptor.isDynamicClasspathModifier()) {
                         classpathChanged = true;
                     }
@@ -155,7 +166,7 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
             throw new IllegalArgumentException("Name or Value can not be 
null");
         }
 
-        final PropertyDescriptor descriptor = 
component.getPropertyDescriptor(name);
+        final PropertyDescriptor descriptor = 
getComponent().getPropertyDescriptor(name);
 
         final String oldValue = properties.put(descriptor, value);
         if (!value.equals(oldValue)) {
@@ -175,7 +186,7 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
             }
 
             try {
-                component.onPropertyModified(descriptor, oldValue, value);
+                getComponent().onPropertyModified(descriptor, oldValue, value);
             } catch (final Exception e) {
                 // nothing really to do here...
             }
@@ -197,7 +208,7 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
             throw new IllegalArgumentException("Name can not be null");
         }
 
-        final PropertyDescriptor descriptor = 
component.getPropertyDescriptor(name);
+        final PropertyDescriptor descriptor = 
getComponent().getPropertyDescriptor(name);
         String value = null;
         if (!descriptor.isRequired() && (value = 
properties.remove(descriptor)) != null) {
 
@@ -211,9 +222,9 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
             }
 
             try {
-                component.onPropertyModified(descriptor, value, null);
+                getComponent().onPropertyModified(descriptor, value, null);
             } catch (final Exception e) {
-                logger.error(e.getMessage(), e);
+                getLogger().error(e.getMessage(), e);
             }
 
             return true;
@@ -231,10 +242,10 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
         try {
             final URL[] urls = 
ClassLoaderUtils.getURLsForClasspath(modulePaths, null, true);
 
-            if (logger.isDebugEnabled()) {
-                logger.debug("Adding {} resources to the classpath for {}", 
new Object[] {urls.length, name});
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug("Adding {} resources to the classpath for 
{}", new Object[] {urls.length, name});
                 for (URL url : urls) {
-                    logger.debug(url.getFile());
+                    getLogger().debug(url.getFile());
                 }
             }
 
@@ -243,8 +254,8 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
             if (!(classLoader instanceof InstanceClassLoader)) {
                 // Really shouldn't happen, but if we somehow got here and 
don't have an InstanceClassLoader then log a warning and move on
                 final String classLoaderName = classLoader == null ? "null" : 
classLoader.getClass().getName();
-                if (logger.isWarnEnabled()) {
-                    logger.warn(String.format("Unable to modify the classpath 
for %s, expected InstanceClassLoader, but found %s", name, classLoaderName));
+                if (getLogger().isWarnEnabled()) {
+                    getLogger().warn(String.format("Unable to modify the 
classpath for %s, expected InstanceClassLoader, but found %s", name, 
classLoaderName));
                 }
                 return;
             }
@@ -253,14 +264,14 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
             instanceClassLoader.setInstanceResources(urls);
         } catch (MalformedURLException e) {
             // Shouldn't get here since we are suppressing errors
-            logger.warn("Error processing classpath resources", e);
+            getLogger().warn("Error processing classpath resources", e);
         }
     }
 
     @Override
     public Map<PropertyDescriptor, String> getProperties() {
-        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(component.getClass(), 
component.getIdentifier())) {
-            final List<PropertyDescriptor> supported = 
component.getPropertyDescriptors();
+        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getComponent().getClass(), 
getComponent().getIdentifier())) {
+            final List<PropertyDescriptor> supported = 
getComponent().getPropertyDescriptors();
             if (supported == null || supported.isEmpty()) {
                 return Collections.unmodifiableMap(properties);
             } else {
@@ -303,36 +314,124 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
 
     @Override
     public String toString() {
-        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(component.getClass(), 
component.getIdentifier())) {
-            return component.toString();
+        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getComponent().getClass(), 
getComponent().getIdentifier())) {
+            return getComponent().toString();
         }
     }
 
     @Override
     public Collection<ValidationResult> validate(final ValidationContext 
context) {
-        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(component.getClass(), 
component.getIdentifier())) {
-            return component.validate(context);
+        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getComponent().getClass(), 
getComponent().getIdentifier())) {
+            final Collection<ValidationResult> validationResults = 
getComponent().validate(context);
+
+            // validate selected controller services implement the API 
required by the processor
+
+            final List<PropertyDescriptor> supportedDescriptors = 
getComponent().getPropertyDescriptors();
+            if (null != supportedDescriptors) {
+                for (final PropertyDescriptor descriptor : 
supportedDescriptors) {
+                    if (descriptor.getControllerServiceDefinition() == null) {
+                        // skip properties that aren't for a controller service
+                        continue;
+                    }
+
+                    final String controllerServiceId = 
context.getProperty(descriptor).getValue();
+                    if (controllerServiceId == null) {
+                        // if the property value is null we should already 
have a validation error
+                        continue;
+                    }
+
+                    final ControllerServiceNode controllerServiceNode = 
getControllerServiceProvider().getControllerServiceNode(controllerServiceId);
+                    if (controllerServiceNode == null) {
+                        // if the node was null we should already have a 
validation error
+                        continue;
+                    }
+
+                    final Class<? extends ControllerService> 
controllerServiceApiClass = descriptor.getControllerServiceDefinition();
+                    final ClassLoader controllerServiceApiClassLoader = 
controllerServiceApiClass.getClassLoader();
+
+                    final Bundle controllerServiceApiBundle = 
ExtensionManager.getBundle(controllerServiceApiClassLoader);
+                    final BundleCoordinate controllerServiceApiCoordinate = 
controllerServiceApiBundle.getBundleDetails().getCoordinate();
+
+                    final Bundle controllerServiceBundle = 
ExtensionManager.getBundle(controllerServiceNode.getBundleCoordinate());
+                    final BundleCoordinate controllerServiceCoordinate = 
controllerServiceBundle.getBundleDetails().getCoordinate();
+
+                    final boolean matchesApi = 
matchesApi(controllerServiceBundle, controllerServiceApiCoordinate);
+
+                    if (!matchesApi) {
+                        final String controllerServiceType = 
controllerServiceNode.getComponentType();
+                        final String controllerServiceApiType = 
controllerServiceApiClass.getSimpleName();
+
+                        final String explanation = new StringBuilder()
+                                .append(controllerServiceType).append(" - 
").append(controllerServiceCoordinate.getVersion())
+                                .append(" from 
").append(controllerServiceCoordinate.getGroup()).append(" - 
").append(controllerServiceCoordinate.getId())
+                                .append(" is not compatible with 
").append(controllerServiceApiType).append(" - 
").append(controllerServiceApiCoordinate.getVersion())
+                                .append(" from 
").append(controllerServiceApiCoordinate.getGroup()).append(" - 
").append(controllerServiceApiCoordinate.getId())
+                                .toString();
+
+                        validationResults.add(new ValidationResult.Builder()
+                                .input(controllerServiceId)
+                                .subject(descriptor.getDisplayName())
+                                .valid(false)
+                                .explanation(explanation)
+                                .build());
+                    }
+
+                }
+            }
+
+            return validationResults;
+        }
+    }
+
+    /**
+     * Determines if the given controller service node has the required API as 
an ancestor.
+     *
+     * @param controllerServiceImplBundle the bundle of a controller service 
being referenced by a processor
+     * @param requiredApiCoordinate the controller service API required by the 
processor
+     * @return true if the controller service node has the require API as an 
ancestor, false otherwise
+     */
+    private boolean matchesApi(final Bundle controllerServiceImplBundle, final 
BundleCoordinate requiredApiCoordinate) {
+        // start with the coordinate of the controller service for cases where 
the API and service are in the same bundle
+        BundleCoordinate controllerServiceDependencyCoordinate = 
controllerServiceImplBundle.getBundleDetails().getCoordinate();
+
+        boolean foundApiDependency = false;
+        while (controllerServiceDependencyCoordinate != null) {
+            // determine if the dependency coordinate matches the required API
+            if 
(requiredApiCoordinate.equals(controllerServiceDependencyCoordinate)) {
+                foundApiDependency = true;
+                break;
+            }
+
+            // move to the next dependency in the chain, or stop if null
+            final Bundle controllerServiceDependencyBundle = 
ExtensionManager.getBundle(controllerServiceDependencyCoordinate);
+            if (controllerServiceDependencyBundle == null) {
+                controllerServiceDependencyCoordinate = null;
+            } else {
+                controllerServiceDependencyCoordinate = 
controllerServiceDependencyBundle.getBundleDetails().getDependencyCoordinate();
+            }
         }
+
+        return foundApiDependency;
     }
 
     @Override
     public PropertyDescriptor getPropertyDescriptor(final String name) {
-        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(component.getClass(), 
component.getIdentifier())) {
-            return component.getPropertyDescriptor(name);
+        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getComponent().getClass(), 
getComponent().getIdentifier())) {
+            return getComponent().getPropertyDescriptor(name);
         }
     }
 
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
-        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(component.getClass(), 
component.getIdentifier())) {
-            component.onPropertyModified(descriptor, oldValue, newValue);
+        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getComponent().getClass(), 
getComponent().getIdentifier())) {
+            getComponent().onPropertyModified(descriptor, oldValue, newValue);
         }
     }
 
     @Override
     public List<PropertyDescriptor> getPropertyDescriptors() {
-        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(component.getClass(), 
component.getIdentifier())) {
-            return component.getPropertyDescriptors();
+        try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getComponent().getClass(), 
getComponent().getIdentifier())) {
+            return getComponent().getPropertyDescriptors();
         }
     }
 
@@ -363,8 +462,8 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
                 serviceIdentifiersNotToValidate, getProperties(), 
getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
 
             final Collection<ValidationResult> validationResults;
-            try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(component.getClass(), 
component.getIdentifier())) {
-                validationResults = component.validate(validationContext);
+            try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(getComponent().getClass(), 
getComponent().getIdentifier())) {
+                validationResults = getComponent().validate(validationContext);
             }
 
             for (final ValidationResult result : validationResults) {
@@ -407,4 +506,19 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
         return this.variableRegistry;
     }
 
+    @Override
+    public void verifyCanUpdateBundle(final BundleCoordinate 
incomingCoordinate) throws IllegalArgumentException {
+        final BundleCoordinate existingCoordinate = getBundleCoordinate();
+
+        // determine if this update is changing the bundle for the processor
+        if (!existingCoordinate.equals(incomingCoordinate)) {
+            // if it is changing the bundle, only allow it to change to a 
different version within same group and id
+            if 
(!existingCoordinate.getGroup().equals(incomingCoordinate.getGroup())
+                    || 
!existingCoordinate.getId().equals(incomingCoordinate.getId())) {
+                throw new IllegalArgumentException(String.format(
+                        "Unable to update component %s from %s to %s because 
bundle group and id must be the same.",
+                        getIdentifier(), existingCoordinate.getCoordinate(), 
incomingCoordinate.getCoordinate()));
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
index a0a6060..1c0b7c1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
@@ -24,8 +24,11 @@ import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.resource.ComponentAuthorizable;
 import org.apache.nifi.authorization.resource.RestrictedComponentsAuthorizable;
 import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
 
 import java.util.Collection;
 import java.util.Map;
@@ -50,6 +53,18 @@ public interface ConfiguredComponent extends 
ComponentAuthorizable {
 
     boolean isValid();
 
+    BundleCoordinate getBundleCoordinate();
+
+    ConfigurableComponent getComponent();
+
+    ComponentLog getLogger();
+
+    boolean isExtensionMissing();
+
+    void setExtensionMissing(boolean extensionMissing);
+
+    void verifyCanUpdateBundle(BundleCoordinate bundleCoordinate) throws 
IllegalStateException;
+
     /**
      * @return the any validation errors for this connectable
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/LoggableComponent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/LoggableComponent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/LoggableComponent.java
new file mode 100644
index 0000000..6d03675
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/LoggableComponent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.logging.ComponentLog;
+
+/**
+ * Holder to pass around a ConfigurableComponent with its coordinate and 
logger.
+ *
+ * @param <T> the type of ConfigurableComponent
+ */
+public class LoggableComponent<T extends ConfigurableComponent> {
+
+    private final T component;
+
+    private final BundleCoordinate bundleCoordinate;
+
+    private final ComponentLog logger;
+
+    public LoggableComponent(final T component, final BundleCoordinate 
bundleCoordinate, final ComponentLog logger) {
+        this.component = component;
+        this.bundleCoordinate = bundleCoordinate;
+        this.logger = logger;
+    }
+
+    public T getComponent() {
+        return component;
+    }
+
+    public BundleCoordinate getBundleCoordinate() {
+        return bundleCoordinate;
+    }
+
+    public ComponentLog getLogger() {
+        return logger;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 8f405e0..8c78958 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -16,40 +16,39 @@
  */
 package org.apache.nifi.controller;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.scheduling.ScheduleState;
 import org.apache.nifi.controller.scheduling.SchedulingAgent;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.registry.VariableRegistry;
-import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 public abstract class ProcessorNode extends AbstractConfiguredComponent 
implements Connectable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ProcessorNode.class);
 
     protected final AtomicReference<ScheduledState> scheduledState;
 
-    public ProcessorNode(final Processor processor, final String id,
+    public ProcessorNode(final String id,
                          final ValidationContextFactory 
validationContextFactory, final ControllerServiceProvider serviceProvider,
                          final String componentType, final String 
componentCanonicalClass, final VariableRegistry variableRegistry,
-                         final ComponentLog logger) {
-        super(processor, id, validationContextFactory, serviceProvider, 
componentType, componentCanonicalClass, variableRegistry, logger);
+                         final boolean isExtensionMissing) {
+        super(id, validationContextFactory, serviceProvider, componentType, 
componentCanonicalClass, variableRegistry, isExtensionMissing);
         this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
     }
 
@@ -77,6 +76,8 @@ public abstract class ProcessorNode extends 
AbstractConfiguredComponent implemen
 
     public abstract Processor getProcessor();
 
+    public abstract void setProcessor(LoggableComponent<Processor> processor);
+
     public abstract void yield(long period, TimeUnit timeUnit);
 
     public abstract void setAutoTerminatedRelationships(Set<Relationship> 
relationships);

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index 11b6cb6..22cc3b5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -47,6 +47,8 @@ public interface ReportingTaskNode extends 
ConfiguredComponent {
 
     ReportingTask getReportingTask();
 
+    void setReportingTask(LoggableComponent<ReportingTask> reportingTask);
+
     ReportingContext getReportingContext();
 
     ConfigurationContext getConfigurationContext();

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
index c4aba44..c979745 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
@@ -24,4 +24,7 @@ public class ProcessorInstantiationException extends 
Exception {
         super(className, t);
     }
 
+    public ProcessorInstantiationException(final String className) {
+        super(className);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
index ee0bf50..b48f198 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.controller.reporting;
 
 import java.util.Set;
+
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.controller.ReportingTaskNode;
 
 /**
@@ -31,6 +33,7 @@ public interface ReportingTaskProvider {
      * @param type the type (fully qualified class name) of the reporting task
      * to instantiate
      * @param id the identifier for the Reporting Task
+     * @param bundleCoordinate the bundle coordinate for the type of reporting 
task
      * @param firstTimeAdded whether or not this is the first time that the
      * reporting task is being added to the flow. I.e., this will be true only
      * when the user adds the reporting task to the flow, not when the flow is
@@ -41,7 +44,7 @@ public interface ReportingTaskProvider {
      * @throws ReportingTaskInstantiationException if unable to create the
      * Reporting Task
      */
-    ReportingTaskNode createReportingTask(String type, String id, boolean 
firstTimeAdded) throws ReportingTaskInstantiationException;
+    ReportingTaskNode createReportingTask(String type, String id, 
BundleCoordinate bundleCoordinate, boolean firstTimeAdded) throws 
ReportingTaskInstantiationException;
 
     /**
      * @param identifier of node
@@ -108,4 +111,15 @@ public interface ReportingTaskProvider {
      * STOPPED, or if the Reporting Task has active threads
      */
     void disableReportingTask(ReportingTaskNode reportingTask);
+
+    /**
+     * Changes the underlying ReportingTask held by the node to an instance of 
the new type.
+     *
+     * @param reportingTask the ReportingTaskNode being updated
+     * @param newType the fully qualified class name of the new type
+     * @param bundleCoordinate the bundle coordinate of the new type
+     * @throws ReportingTaskInstantiationException if unable to create an 
instance of the new type
+     */
+    void changeReportingTaskType(final ReportingTaskNode reportingTask, final 
String newType, final BundleCoordinate bundleCoordinate) throws 
ReportingTaskInstantiationException;
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceInvocationHandler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceInvocationHandler.java
new file mode 100644
index 0000000..0d03e98
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceInvocationHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import java.lang.reflect.InvocationHandler;
+
+public interface ControllerServiceInvocationHandler extends InvocationHandler {
+
+    /**
+     * Allows changing the underlying node that is used by this invocation 
handler to check the state of the service.
+     *
+     * @param serviceNode the node to be used by this invocation handler
+     */
+    void setServiceNode(final ControllerServiceNode serviceNode);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index c0ff480..8fe4eb2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.service;
 
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.LoggableComponent;
 import org.apache.nifi.groups.ProcessGroup;
 
 import java.util.List;
@@ -52,6 +53,11 @@ public interface ControllerServiceNode extends 
ConfiguredComponent {
     ControllerService getProxiedControllerService();
 
     /**
+     * @return the invocation handler being used by the proxy
+     */
+    ControllerServiceInvocationHandler getInvocationHandler();
+
+    /**
      * Returns the list of services that are required to be enabled before this
      * service is enabled. The returned list is flattened and contains both
      * immediate and transient dependencies.
@@ -167,4 +173,16 @@ public interface ControllerServiceNode extends 
ConfiguredComponent {
      * {@link #disable(ScheduledExecutorService)}.
      */
     boolean isActive();
+
+    /**
+     * Sets a new proxy and implementation for this node.
+     *
+     * @param implementation the actual implementation controller service
+     * @param proxiedControllerService the proxied controller service
+     * @param invocationHandler the invocation handler being used by the proxy
+     */
+    void setControllerServiceAndProxy(final 
LoggableComponent<ControllerService> implementation,
+                                      final 
LoggableComponent<ControllerService> proxiedControllerService,
+                                      final ControllerServiceInvocationHandler 
invocationHandler);
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 51d54a0..d1111e6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -20,6 +20,7 @@ import java.util.Collection;
 import java.util.Set;
 
 import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
@@ -36,10 +37,11 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
      *
      * @param type of service
      * @param id of service
+     * @param bundleCoordinate the coordinate of the bundle for the service
      * @param firstTimeAdded for service
      * @return the service node
      */
-    ControllerServiceNode createControllerService(String type, String id, 
boolean firstTimeAdded);
+    ControllerServiceNode createControllerService(String type, String id, 
BundleCoordinate bundleCoordinate, boolean firstTimeAdded);
 
     /**
      * @param id of the service

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/services/FlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/services/FlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/services/FlowService.java
index 53dbb01..4f5d284 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/services/FlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/services/FlowService.java
@@ -103,11 +103,19 @@ public interface FlowService extends LifeCycle {
     void copyCurrentFlow(OutputStream os) throws IOException;
 
     /**
-     * Creates a DataFlow object from the current flow
+     * Creates a DataFlow object by first looking for a flow on from disk, and 
falling back to the controller's flow otherwise.
      *
      * @return the created DataFlow object
      *
      * @throws IOException if unable to read the flow from disk
      */
     DataFlow createDataFlow() throws IOException;
+
+    /**
+     * Creates a DataFlow object by serializing the flow controller's flow.
+     *
+     * @return the created DataFlow object.
+     */
+    DataFlow createDataFlowFromController() throws IOException;
+
 }

Reply via email to