http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 36a9524..35686a5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,31 @@
  */
 package org.apache.nifi.web;
 
-import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.Component;
 import org.apache.nifi.action.FlowChangeAction;
@@ -84,6 +108,7 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.history.History;
 import org.apache.nifi.history.HistoryQuery;
 import org.apache.nifi.history.PreviousValue;
+import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
@@ -92,6 +117,7 @@ import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.api.dto.AccessPolicyDTO;
 import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
+import org.apache.nifi.web.api.dto.AffectedComponentDTO;
 import org.apache.nifi.web.api.dto.BulletinBoardDTO;
 import org.apache.nifi.web.api.dto.BulletinDTO;
 import org.apache.nifi.web.api.dto.BulletinQueryDTO;
@@ -137,6 +163,7 @@ import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.dto.UserDTO;
 import org.apache.nifi.web.api.dto.UserGroupDTO;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
 import org.apache.nifi.web.api.dto.action.HistoryDTO;
 import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -157,6 +184,7 @@ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.entity.AccessPolicyEntity;
 import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
 import org.apache.nifi.web.api.entity.ActionEntity;
+import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
 import org.apache.nifi.web.api.entity.BulletinEntity;
 import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
@@ -190,6 +218,8 @@ import org.apache.nifi.web.api.entity.TemplateEntity;
 import org.apache.nifi.web.api.entity.TenantEntity;
 import org.apache.nifi.web.api.entity.UserEntity;
 import org.apache.nifi.web.api.entity.UserGroupEntity;
+import org.apache.nifi.web.api.entity.VariableEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.dao.AccessPolicyDAO;
 import org.apache.nifi.web.dao.ConnectionDAO;
@@ -217,28 +247,7 @@ import org.apache.nifi.web.util.SnippetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
 
 /**
  * Implementation of NiFiServiceFacade that performs revision checking.
@@ -423,6 +432,11 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public void verifyActivateControllerServices(final String groupId, final 
ControllerServiceState state, final Set<String> serviceIds) {
+        processGroupDAO.verifyActivateControllerServices(groupId, state, 
serviceIds);
+    }
+
+    @Override
     public void verifyDeleteProcessGroup(final String groupId) {
         processGroupDAO.verifyDelete(groupId);
     }
@@ -624,6 +638,10 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
      */
     private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, 
final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, 
D> dtoCreation) {
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        return updateComponent(user, revision, authorizable, daoUpdate, 
dtoCreation);
+    }
+
+    private <D, C> RevisionUpdate<D> updateComponent(final NiFiUser user, 
final Revision revision, final Authorizable authorizable, final Supplier<C> 
daoUpdate, final Function<C, D> dtoCreation) {
         try {
             final RevisionUpdate<D> updatedComponent = 
revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new 
UpdateRevisionTask<D>() {
                 @Override
@@ -774,6 +792,81 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public Set<AffectedComponentDTO> 
getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO 
variableRegistryDto) {
+        final ProcessGroup group = 
processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
+        if (group == null) {
+            throw new ResourceNotFoundException("Could not find Process Group 
with ID " + variableRegistryDto.getProcessGroupId());
+        }
+
+        final Map<String, String> variableMap = new HashMap<>();
+        variableRegistryDto.getVariables().stream() // have to use forEach 
here instead of using Collectors.toMap because value may be null
+            .map(VariableEntity::getVariable)
+            .forEach(var -> variableMap.put(var.getName(), var.getValue()));
+
+        final Set<AffectedComponentDTO> affectedComponentDtos = new 
HashSet<>();
+
+        final Set<String> updatedVariableNames = getUpdatedVariables(group, 
variableMap);
+        for (final String variableName : updatedVariableNames) {
+            final Set<ConfiguredComponent> affectedComponents = 
group.getComponentsAffectedByVariable(variableName);
+
+            for (final ConfiguredComponent component : affectedComponents) {
+                if (component instanceof ProcessorNode) {
+                    final ProcessorNode procNode = (ProcessorNode) component;
+                    if (procNode.isRunning()) {
+                        
affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode));
+                    }
+                } else if (component instanceof ControllerServiceNode) {
+                    final ControllerServiceNode serviceNode = 
(ControllerServiceNode) component;
+                    if (serviceNode.isActive()) {
+                        
affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode));
+                    }
+                } else {
+                    throw new RuntimeException("Found unexpected type of 
Component [" + component.getCanonicalClassName() + "] dependending on 
variable");
+                }
+            }
+        }
+
+        return affectedComponentDtos;
+    }
+
+    private Set<String> getUpdatedVariables(final ProcessGroup group, final 
Map<String, String> newVariableValues) {
+        final Set<String> updatedVariableNames = new HashSet<>();
+
+        final ComponentVariableRegistry registry = group.getVariableRegistry();
+        for (final Map.Entry<String, String> entry : 
newVariableValues.entrySet()) {
+            final String varName = entry.getKey();
+            final String newValue = entry.getValue();
+
+            final String curValue = registry.getVariableValue(varName);
+            if (!Objects.equals(newValue, curValue)) {
+                updatedVariableNames.add(varName);
+            }
+        }
+
+        return updatedVariableNames;
+    }
+
+
+    @Override
+    public VariableRegistryEntity updateVariableRegistry(Revision revision, 
VariableRegistryDTO variableRegistryDto) {
+        return updateVariableRegistry(NiFiUserUtils.getNiFiUser(), revision, 
variableRegistryDto);
+    }
+
+    @Override
+    public VariableRegistryEntity updateVariableRegistry(NiFiUser user, 
Revision revision, VariableRegistryDTO variableRegistryDto) {
+        final ProcessGroup processGroupNode = 
processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
+        final RevisionUpdate<VariableRegistryDTO> snapshot = 
updateComponent(user, revision,
+            processGroupNode,
+            () -> processGroupDAO.updateVariableRegistry(variableRegistryDto),
+            processGroup -> 
dtoFactory.createVariableRegistryDto(processGroup));
+
+        final PermissionsDTO permissions = 
dtoFactory.createPermissionsDto(processGroupNode);
+        final RevisionDTO updatedRevision = 
dtoFactory.createRevisionDTO(snapshot.getLastModification());
+        return 
entityFactory.createVariableRegistryEntity(snapshot.getComponent(), 
updatedRevision, permissions);
+    }
+
+
+    @Override
     public ProcessGroupEntity updateProcessGroup(final Revision revision, 
final ProcessGroupDTO processGroupDTO) {
         final ProcessGroup processGroupNode = 
processGroupDAO.getProcessGroup(processGroupDTO.getId());
         final RevisionUpdate<ProcessGroupDTO> snapshot = 
updateComponent(revision,
@@ -790,14 +883,27 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
+        if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
+            processGroupDAO.verifyUpdate(processGroupDTO);
+        }
+    }
+
+    @Override
     public ScheduleComponentsEntity scheduleComponents(final String 
processGroupId, final ScheduledState state, final Map<String, Revision> 
componentRevisions) {
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        return scheduleComponents(user, processGroupId, state, 
componentRevisions);
+    }
+
+    @Override
+    public ScheduleComponentsEntity scheduleComponents(final NiFiUser user, 
final String processGroupId, final ScheduledState state, final Map<String, 
Revision> componentRevisions) {
+
         final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = 
revisionManager.updateRevision(new 
StandardRevisionClaim(componentRevisions.values()), user, new
                 UpdateRevisionTask<ScheduleComponentsEntity>() {
                     @Override
                     public RevisionUpdate<ScheduleComponentsEntity> update() {
                         // schedule the components
-                        processGroupDAO.scheduleComponents(processGroupId, 
state, componentRevisions.keySet());
+                processGroupDAO.scheduleComponents(processGroupId, state, 
componentRevisions.keySet());
 
                         // update the revisions
                         final Map<String, Revision> updatedRevisions = new 
HashMap<>();
@@ -821,6 +927,46 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public ActivateControllerServicesEntity activateControllerServices(final 
String processGroupId, final ControllerServiceState state, final Map<String, 
Revision> serviceRevisions) {
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        return activateControllerServices(user, processGroupId, state, 
serviceRevisions);
+    }
+
+    @Override
+    public ActivateControllerServicesEntity activateControllerServices(final 
NiFiUser user, final String processGroupId, final ControllerServiceState state,
+        final Map<String, Revision> serviceRevisions) {
+
+        final RevisionUpdate<ActivateControllerServicesEntity> 
updatedComponent = revisionManager.updateRevision(new 
StandardRevisionClaim(serviceRevisions.values()), user,
+            new UpdateRevisionTask<ActivateControllerServicesEntity>() {
+                @Override
+                public RevisionUpdate<ActivateControllerServicesEntity> 
update() {
+                    // schedule the components
+                    processGroupDAO.activateControllerServices(processGroupId, 
state, serviceRevisions.keySet());
+
+                    // update the revisions
+                    final Map<String, Revision> updatedRevisions = new 
HashMap<>();
+                    for (final Revision revision : serviceRevisions.values()) {
+                        final Revision currentRevision = 
revisionManager.getRevision(revision.getComponentId());
+                        updatedRevisions.put(revision.getComponentId(), 
currentRevision.incrementRevision(revision.getClientId()));
+                    }
+
+                    // save
+                    controllerFacade.save();
+
+                    // gather details for response
+                    final ActivateControllerServicesEntity entity = new 
ActivateControllerServicesEntity();
+                    entity.setId(processGroupId);
+                    entity.setState(state.name());
+                    return new StandardRevisionUpdate<>(entity, null, new 
HashSet<>(updatedRevisions.values()));
+                }
+            });
+
+        return updatedComponent.getComponent();
+    }
+
+
+    @Override
     public ControllerConfigurationEntity updateControllerConfiguration(final 
Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) 
{
         final RevisionUpdate<ControllerConfigurationDTO> updatedComponent = 
updateComponent(
                 revision,
@@ -3062,7 +3208,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         return createProcessGroupEntity(processGroup);
     }
 
-    private ControllerServiceEntity createControllerServiceEntity(final 
ControllerServiceNode serviceNode, final Set<String> serviceIds) {
+    private ControllerServiceEntity createControllerServiceEntity(final 
ControllerServiceNode serviceNode, final Set<String> serviceIds, final NiFiUser 
user) {
         final ControllerServiceDTO dto = 
dtoFactory.createControllerServiceDto(serviceNode);
 
         final ControllerServiceReference ref = serviceNode.getReferences();
@@ -3070,26 +3216,77 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         
dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
 
         final RevisionDTO revision = 
dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()));
-        final PermissionsDTO permissions = 
dtoFactory.createPermissionsDto(serviceNode);
+        final PermissionsDTO permissions = 
dtoFactory.createPermissionsDto(serviceNode, user);
         final List<BulletinDTO> bulletins = 
dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(serviceNode.getIdentifier()));
         final List<BulletinEntity> bulletinEntities = 
bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, 
permissions.getCanRead())).collect(Collectors.toList());
         return entityFactory.createControllerServiceEntity(dto, revision, 
permissions, bulletinEntities);
     }
 
     @Override
-    public Set<ControllerServiceEntity> getControllerServices(final String 
groupId) {
-        final Set<ControllerServiceNode> serviceNodes = 
controllerServiceDAO.getControllerServices(groupId);
+    public VariableRegistryEntity getVariableRegistry(final String groupId, 
final boolean includeAncestorGroups) {
+        final ProcessGroup processGroup = 
processGroupDAO.getProcessGroup(groupId);
+        if (processGroup == null) {
+            throw new ResourceNotFoundException("Could not find group with ID 
" + groupId);
+        }
+
+        return createVariableRegistryEntity(processGroup, 
includeAncestorGroups);
+    }
+
+    private VariableRegistryEntity createVariableRegistryEntity(final 
ProcessGroup processGroup, final boolean includeAncestorGroups) {
+        final VariableRegistryDTO registryDto = 
dtoFactory.createVariableRegistryDto(processGroup);
+        final RevisionDTO revision = 
dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
+        final PermissionsDTO permissions = 
dtoFactory.createPermissionsDto(processGroup);
+
+        if (includeAncestorGroups) {
+            ProcessGroup parent = processGroup.getParent();
+            while (parent != null) {
+                final PermissionsDTO parentPerms = 
dtoFactory.createPermissionsDto(processGroup);
+                if (Boolean.TRUE.equals(parentPerms.getCanRead())) {
+                    final VariableRegistryDTO parentRegistryDto = 
dtoFactory.createVariableRegistryDto(parent);
+                    final Set<VariableEntity> parentVariables = 
parentRegistryDto.getVariables();
+                    registryDto.getVariables().addAll(parentVariables);
+                }
+
+                parent = parent.getParent();
+            }
+        }
+
+        return entityFactory.createVariableRegistryEntity(registryDto, 
revision, permissions);
+    }
+
+    @Override
+    public VariableRegistryEntity populateAffectedComponents(final 
VariableRegistryDTO variableRegistryDto) {
+        final String groupId = variableRegistryDto.getProcessGroupId();
+        final ProcessGroup processGroup = 
processGroupDAO.getProcessGroup(groupId);
+        if (processGroup == null) {
+            throw new ResourceNotFoundException("Could not find group with ID 
" + groupId);
+        }
+
+        final VariableRegistryDTO registryDto = 
dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup);
+        final RevisionDTO revision = 
dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
+        final PermissionsDTO permissions = 
dtoFactory.createPermissionsDto(processGroup);
+        return entityFactory.createVariableRegistryEntity(registryDto, 
revision, permissions);
+    }
+
+    @Override
+    public Set<ControllerServiceEntity> getControllerServices(final String 
groupId, final boolean includeAncestorGroups, final boolean 
includeDescendantGroups) {
+        return getControllerServices(groupId, includeAncestorGroups, 
includeDescendantGroups, NiFiUserUtils.getNiFiUser());
+    }
+
+    @Override
+    public Set<ControllerServiceEntity> getControllerServices(final String 
groupId, final boolean includeAncestorGroups, final boolean 
includeDescendantGroups, final NiFiUser user) {
+        final Set<ControllerServiceNode> serviceNodes = 
controllerServiceDAO.getControllerServices(groupId, includeAncestorGroups, 
includeDescendantGroups);
         final Set<String> serviceIds = serviceNodes.stream().map(service -> 
service.getIdentifier()).collect(Collectors.toSet());
 
         return serviceNodes.stream()
-            .map(serviceNode -> createControllerServiceEntity(serviceNode, 
serviceIds))
+            .map(serviceNode -> createControllerServiceEntity(serviceNode, 
serviceIds, user))
             .collect(Collectors.toSet());
     }
 
     @Override
     public ControllerServiceEntity getControllerService(final String 
controllerServiceId) {
         final ControllerServiceNode controllerService = 
controllerServiceDAO.getControllerService(controllerServiceId);
-        return createControllerServiceEntity(controllerService, 
Sets.newHashSet(controllerServiceId));
+        return createControllerServiceEntity(controllerService, 
Sets.newHashSet(controllerServiceId), NiFiUserUtils.getNiFiUser());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 3d78741..1a50d04 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -608,6 +608,11 @@ public abstract class ApplicationResource {
             serviceFacade.authorizeAccess(authorizer);
             serviceFacade.verifyRevision(revision, user);
 
+            // verify if necessary
+            if (verifier != null) {
+                verifier.run();
+            }
+
             return action.apply(revision, entity);
         }
     }
@@ -657,6 +662,11 @@ public abstract class ApplicationResource {
             serviceFacade.authorizeAccess(authorizer);
             serviceFacade.verifyRevisions(revisions, user);
 
+            // verify if necessary
+            if (verifier != null) {
+                verifier.run();
+            }
+
             return action.apply(revisions, entity);
         }
     }
@@ -820,16 +830,16 @@ public abstract class ApplicationResource {
         }
     }
 
-    /**
-     * Replicates the request to the given node
-     *
-     * @param method   the HTTP method
-     * @param nodeUuid the UUID of the node to replicate the request to
-     * @return the response from the node
-     * @throws UnknownNodeException if the nodeUuid given does not map to any 
node in the cluster
-     */
-    protected Response replicate(final String method, final String nodeUuid) {
-        return replicate(method, getRequestParameters(), nodeUuid);
+
+    private void ensureFlowInitialized() {
+        if (!flowController.isInitialized()) {
+            throw new IllegalClusterStateException("Cluster is still in the 
process of voting on the appropriate Data Flow.");
+        }
+    }
+
+    protected Response replicate(final String method, final Object entity, 
final String nodeUuid, final Map<String, String> headersToOverride) {
+        final URI path = getAbsolutePath();
+        return replicate(path, method, entity, nodeUuid, headersToOverride);
     }
 
     /**
@@ -845,22 +855,16 @@ public abstract class ApplicationResource {
         return replicate(method, entity, nodeUuid, null);
     }
 
-    private void ensureFlowInitialized() {
-        if (!flowController.isInitialized()) {
-            throw new IllegalClusterStateException("Cluster is still in the 
process of voting on the appropriate Data Flow.");
-        }
-    }
-
     /**
      * Replicates the request to the given node
      *
-     * @param method   the HTTP method
-     * @param entity   the Entity to replicate
+     * @param method the HTTP method
+     * @param entity the Entity to replicate
      * @param nodeUuid the UUID of the node to replicate the request to
      * @return the response from the node
      * @throws UnknownNodeException if the nodeUuid given does not map to any 
node in the cluster
      */
-    protected Response replicate(final String method, final Object entity, 
final String nodeUuid, final Map<String, String> headersToOverride) {
+    protected Response replicate(final URI path, final String method, final 
Object entity, final String nodeUuid, final Map<String, String> 
headersToOverride) {
         // since we're cluster we must specify the cluster node identifier
         if (nodeUuid == null) {
             throw new IllegalArgumentException("The cluster node identifier 
must be specified.");
@@ -873,7 +877,6 @@ public abstract class ApplicationResource {
 
         ensureFlowInitialized();
 
-        final URI path = getAbsolutePath();
         try {
             final Map<String, String> headers = headersToOverride == null ? 
getHeaders() : getHeaders(headersToOverride);
 
@@ -996,6 +999,12 @@ public abstract class ApplicationResource {
         }
     }
 
+
+    protected NodeResponse replicateNodeResponse(final String method, final 
Object entity, final Map<String, String> headersToOverride) throws 
InterruptedException {
+        final URI path = getAbsolutePath();
+        return replicateNodeResponse(path, method, entity, headersToOverride);
+    }
+
     /**
      * Replicates the request to all nodes in the cluster using the provided 
method and entity. The headers
      * used will be those provided by the {@link #getHeaders()} method. The 
URI that will be used will be
@@ -1009,10 +1018,9 @@ public abstract class ApplicationResource {
      * @throws InterruptedException if interrupted while replicating the 
request
      * @see #replicate(String, Object, Map)
      */
-    protected NodeResponse replicateNodeResponse(final String method, final 
Object entity, final Map<String, String> headersToOverride) throws 
InterruptedException {
+    protected NodeResponse replicateNodeResponse(final URI path, final String 
method, final Object entity, final Map<String, String> headersToOverride) 
throws InterruptedException {
         ensureFlowInitialized();
 
-        final URI path = getAbsolutePath();
         final Map<String, String> headers = headersToOverride == null ? 
getHeaders() : getHeaders(headersToOverride);
 
         // Determine whether we should replicate only to the cluster 
coordinator, or if we should replicate directly

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 889676c..5d5a796 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -36,6 +36,8 @@ import 
org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.nar.NarClassLoaders;
 import org.apache.nifi.util.NiFiProperties;
@@ -61,6 +63,7 @@ import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
 import org.apache.nifi.web.api.entity.AboutEntity;
 import org.apache.nifi.web.api.entity.ActionEntity;
+import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
 import org.apache.nifi.web.api.entity.BannerEntity;
 import org.apache.nifi.web.api.entity.BulletinBoardEntity;
 import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
@@ -118,6 +121,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -388,7 +392,7 @@ public class FlowResource extends ApplicationResource {
         }
 
         // get all the controller services
-        final Set<ControllerServiceEntity> controllerServices = 
serviceFacade.getControllerServices(null);
+        final Set<ControllerServiceEntity> controllerServices = 
serviceFacade.getControllerServices(null, false, false);
         
controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices);
 
         // create the response entity
@@ -426,11 +430,10 @@ public class FlowResource extends ApplicationResource {
             }
     )
     public Response getControllerServicesFromGroup(
-            @ApiParam(
-                    value = "The process group id.",
-                    required = true
-            )
-            @PathParam("id") String groupId) throws InterruptedException {
+            @ApiParam(value = "The process group id.", required = true) 
@PathParam("id") String groupId,
+            @ApiParam("Whether or not to include parent/ancestory process 
groups") @QueryParam("includeAncestorGroups") @DefaultValue("true") boolean 
includeAncestorGroups,
+            @ApiParam("Whether or not to include descendant process groups") 
@QueryParam("includeDescendantGroups") @DefaultValue("false") boolean 
includeDescendantGroups
+            ) throws InterruptedException {
 
         authorizeFlow();
 
@@ -439,7 +442,7 @@ public class FlowResource extends ApplicationResource {
         }
 
         // get all the controller services
-        final Set<ControllerServiceEntity> controllerServices = 
serviceFacade.getControllerServices(groupId);
+        final Set<ControllerServiceEntity> controllerServices = 
serviceFacade.getControllerServices(groupId, includeAncestorGroups, 
includeDescendantGroups);
         
controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices);
 
         // create the response entity
@@ -512,7 +515,7 @@ public class FlowResource extends ApplicationResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("process-groups/{id}")
     @ApiOperation(
-            value = "Schedule or unschedule comopnents in the specified 
Process Group.",
+            value = "Schedule or unschedule components in the specified 
Process Group.",
             response = ScheduleComponentsEntity.class,
             authorizations = {
                     @Authorization(value = "Read - /flow", type = ""),
@@ -570,7 +573,7 @@ public class FlowResource extends ApplicationResource {
 
                 // ensure authorized for each processor we will attempt to 
schedule
                 group.findAllProcessors().stream()
-                        .filter(ScheduledState.RUNNING.equals(state) ? 
ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS)
+                    .filter(ScheduledState.RUNNING.equals(state) ? 
ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS)
                         .filter(processor -> 
processor.isAuthorized(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser()))
                         .forEach(processor -> {
                             componentIds.add(processor.getIdentifier());
@@ -578,7 +581,7 @@ public class FlowResource extends ApplicationResource {
 
                 // ensure authorized for each input port we will attempt to 
schedule
                 group.findAllInputPorts().stream()
-                        .filter(ScheduledState.RUNNING.equals(state) ? 
ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+                    .filter(ScheduledState.RUNNING.equals(state) ? 
ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
                         .filter(inputPort -> 
inputPort.isAuthorized(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser()))
                         .forEach(inputPort -> {
                             componentIds.add(inputPort.getIdentifier());
@@ -586,7 +589,7 @@ public class FlowResource extends ApplicationResource {
 
                 // ensure authorized for each output port we will attempt to 
schedule
                 group.findAllOutputPorts().stream()
-                        .filter(ScheduledState.RUNNING.equals(state) ? 
ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+                    .filter(ScheduledState.RUNNING.equals(state) ? 
ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
                         .filter(outputPort -> 
outputPort.isAuthorized(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser()))
                         .forEach(outputPort -> {
                             componentIds.add(outputPort.getIdentifier());
@@ -640,7 +643,129 @@ public class FlowResource extends ApplicationResource {
                             
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 e -> getRevision(e.getValue(), e.getKey())));
 
                     // update the process group
-                    final ScheduleComponentsEntity entity = 
serviceFacade.scheduleComponents(id, scheduledState, componentRevisions);
+                final ScheduleComponentsEntity entity = 
serviceFacade.scheduleComponents(id, scheduledState, componentRevisions);
+                    return generateOkResponse(entity).build();
+                }
+        );
+    }
+
+
+    @PUT
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("process-groups/{id}/controller-services")
+    @ApiOperation(value = "Enable or disable Controller Services in the 
specified Process Group.",
+        response = ActivateControllerServicesEntity.class,
+        authorizations = {
+            @Authorization(value = "Read - /flow", type = ""),
+            @Authorization(value = "Write - /{component-type}/{uuid} - For 
every service being enabled/disabled", type = "")
+        })
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful.")
+            }
+    )
+    public Response activateControllerServices(
+            @Context HttpServletRequest httpServletRequest,
+            @ApiParam(value = "The process group id.", required = true)
+            @PathParam("id") String id,
+            @ApiParam(value = "The request to schedule or unschedule. If the 
comopnents in the request are not specified, all authorized components will be 
considered.", required = true)
+            final ActivateControllerServicesEntity requestEntity) {
+
+        // ensure the same id is being used
+        if (!id.equals(requestEntity.getId())) {
+            throw new IllegalArgumentException(String.format("The process 
group id (%s) in the request body does "
+                + "not equal the process group id of the requested resource 
(%s).", requestEntity.getId(), id));
+        }
+
+        final ControllerServiceState state;
+        if (requestEntity.getState() == null) {
+            throw new IllegalArgumentException("The controller service state 
must be specified.");
+        } else {
+            try {
+                state = 
ControllerServiceState.valueOf(requestEntity.getState());
+            } catch (final IllegalArgumentException iae) {
+                throw new IllegalArgumentException(String.format("The 
controller service state must be one of [%s].",
+                    
StringUtils.join(EnumSet.of(ControllerServiceState.ENABLED, 
ControllerServiceState.DISABLED), ", ")));
+            }
+        }
+
+        // ensure its a supported scheduled state
+        if (ControllerServiceState.DISABLING.equals(state) || 
ControllerServiceState.ENABLING.equals(state)) {
+            throw new IllegalArgumentException(String.format("The scheduled 
must be one of [%s].",
+                StringUtils.join(EnumSet.of(ControllerServiceState.ENABLED, 
ControllerServiceState.DISABLED), ", ")));
+        }
+
+        // if the components are not specified, gather all components and 
their current revision
+        if (requestEntity.getComponents() == null) {
+            // get the current revisions for the components being updated
+            final Set<Revision> revisions = 
serviceFacade.getRevisionsFromGroup(id, group -> {
+                final Set<String> componentIds = new HashSet<>();
+
+                final Predicate<ControllerServiceNode> filter;
+                if (ControllerServiceState.ENABLED.equals(state)) {
+                    filter = service -> !service.isActive() && 
service.isValid();
+                } else {
+                    filter = service -> service.isActive();
+                }
+
+                group.findAllControllerServices().stream()
+                    .filter(filter)
+                    .filter(service -> service.isAuthorized(authorizer, 
RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
+                    .forEach(service -> 
componentIds.add(service.getIdentifier()));
+                return componentIds;
+            });
+
+            // build the component mapping
+            final Map<String, RevisionDTO> componentsToSchedule = new 
HashMap<>();
+            revisions.forEach(revision -> {
+                final RevisionDTO dto = new RevisionDTO();
+                dto.setClientId(revision.getClientId());
+                dto.setVersion(revision.getVersion());
+                componentsToSchedule.put(revision.getComponentId(), dto);
+            });
+
+            // set the components and their current revision
+            requestEntity.setComponents(componentsToSchedule);
+        }
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.PUT, requestEntity);
+        }
+
+        final Map<String, RevisionDTO> requestComponentsToSchedule = 
requestEntity.getComponents();
+        final Map<String, Revision> requestComponentRevisions =
+                
requestComponentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 e -> getRevision(e.getValue(), e.getKey())));
+        final Set<Revision> requestRevisions = new 
HashSet<>(requestComponentRevisions.values());
+
+        return withWriteLock(
+                serviceFacade,
+                requestEntity,
+                requestRevisions,
+                lookup -> {
+                    // ensure access to the flow
+                    authorizeFlow();
+
+                    // ensure access to every component being scheduled
+                    requestComponentsToSchedule.keySet().forEach(componentId 
-> {
+                        final Authorizable authorizable = 
lookup.getControllerService(componentId).getAuthorizable();
+                        authorizable.authorize(authorizer, 
RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+                    });
+                },
+                () -> serviceFacade.verifyActivateControllerServices(id, 
state, requestComponentRevisions.keySet()),
+                (revisions, scheduleComponentsEntity) -> {
+                final ControllerServiceState serviceState = 
ControllerServiceState.valueOf(scheduleComponentsEntity.getState());
+
+                    final Map<String, RevisionDTO> componentsToSchedule = 
scheduleComponentsEntity.getComponents();
+                    final Map<String, Revision> componentRevisions =
+                            
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 e -> getRevision(e.getValue(), e.getKey())));
+
+                    // update the controller services
+                final ActivateControllerServicesEntity entity = 
serviceFacade.activateControllerServices(id, serviceState, componentRevisions);
                     return generateOkResponse(entity).build();
                 }
         );

Reply via email to