Repository: nifi Updated Branches: refs/heads/master 6fbe1515e -> b7272e3f3
http://git-wip-us.apache.org/repos/asf/nifi/blob/b7272e3f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java index 28fef0f..a2243b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java @@ -18,6 +18,7 @@ package org.apache.nifi.web.util; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; @@ -65,7 +66,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle @Override - public Set<AffectedComponentEntity> scheduleComponents(final URI exampleUri, final NiFiUser user, final String groupId, final Set<AffectedComponentEntity> components, + public Set<AffectedComponentEntity> scheduleComponents(final URI exampleUri, final String groupId, final Set<AffectedComponentEntity> components, final ScheduledState desiredState, final Pause pause) throws LifecycleManagementException { final Set<String> componentIds = components.stream() @@ -95,6 +96,8 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle final Map<String, String> headers = new HashMap<>(); headers.put("content-type", MediaType.APPLICATION_JSON); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. try { final NodeResponse clusterResponse; @@ -122,7 +125,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle } final Set<AffectedComponentEntity> updatedEntities = components.stream() - .map(component -> AffectedComponentUtils.updateEntity(component, serviceFacade, dtoFactory, user)) + .map(component -> AffectedComponentUtils.updateEntity(component, serviceFacade, dtoFactory)) .collect(Collectors.toSet()); return updatedEntities; } @@ -274,7 +277,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle @Override - public Set<AffectedComponentEntity> activateControllerServices(final URI originalUri, final NiFiUser user, final String groupId, final Set<AffectedComponentEntity> affectedServices, + public Set<AffectedComponentEntity> activateControllerServices(final URI originalUri, final String groupId, final Set<AffectedComponentEntity> affectedServices, final ControllerServiceState desiredState, final Pause pause) throws LifecycleManagementException { final Set<String> affectedServiceIds = affectedServices.stream() @@ -301,6 +304,8 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle final Map<String, String> headers = new HashMap<>(); headers.put("content-type", MediaType.APPLICATION_JSON); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. try { final NodeResponse clusterResponse; @@ -328,7 +333,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle } return affectedServices.stream() - .map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId(), user)) + .map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId())) .map(dtoFactory::createAffectedComponentEntity) .collect(Collectors.toSet()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/b7272e3f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java index c84b966..687c370 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java @@ -17,20 +17,18 @@ package org.apache.nifi.web.util; -import java.net.URI; -import java.util.Set; - -import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.api.entity.AffectedComponentEntity; +import java.net.URI; +import java.util.Set; + public interface ComponentLifecycle { /** * Updates the scheduled state of all components that are given, to match the desired ScheduledState * * @param exampleUri an URI to use as a base for the REST API. - * @param user the user making the request * @param groupId the ID of the process group * @param components the components to schedule or unschedule * @param desiredState the desired state of the components @@ -40,14 +38,13 @@ public interface ComponentLifecycle { * * @throws IllegalStateException if any of the components given do not have a state that can be transitioned to the given desired state */ - Set<AffectedComponentEntity> scheduleComponents(URI exampleUri, NiFiUser user, String groupId, Set<AffectedComponentEntity> components, + Set<AffectedComponentEntity> scheduleComponents(URI exampleUri, String groupId, Set<AffectedComponentEntity> components, ScheduledState desiredState, Pause pause) throws LifecycleManagementException; /** * Updates the Controller Service State state of all controller services that are given, to match the desired ControllerServiceState * * @param exampleUri an URI to use as a base for the REST API - * @param user the user making the request * @param groupId the ID of the process group * @param services the controller services to enable or disable * @param desiredState the desired state of the components @@ -57,6 +54,6 @@ public interface ComponentLifecycle { * * @throws IllegalStateException if any of the components given do not have a state that can be transitioned to the given desired state */ - Set<AffectedComponentEntity> activateControllerServices(URI exampleUri, NiFiUser user, String groupId, Set<AffectedComponentEntity> services, + Set<AffectedComponentEntity> activateControllerServices(URI exampleUri, String groupId, Set<AffectedComponentEntity> services, ControllerServiceState desiredState, Pause pause) throws LifecycleManagementException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/b7272e3f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java index 1c7e82d..8a08684 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java @@ -17,7 +17,6 @@ package org.apache.nifi.web.util; -import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceNode; @@ -51,7 +50,7 @@ public class LocalComponentLifecycle implements ComponentLifecycle { private DtoFactory dtoFactory; @Override - public Set<AffectedComponentEntity> scheduleComponents(final URI exampleUri, final NiFiUser user, final String groupId, final Set<AffectedComponentEntity> components, + public Set<AffectedComponentEntity> scheduleComponents(final URI exampleUri, final String groupId, final Set<AffectedComponentEntity> components, final ScheduledState desiredState, final Pause pause) throws LifecycleManagementException { final Map<String, Revision> processorRevisions = components.stream() @@ -61,19 +60,19 @@ public class LocalComponentLifecycle implements ComponentLifecycle { .collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity())); if (desiredState == ScheduledState.RUNNING) { - startComponents(groupId, processorRevisions, affectedComponentMap, user, pause); + startComponents(groupId, processorRevisions, affectedComponentMap, pause); } else { - stopComponents(groupId, processorRevisions, affectedComponentMap, user, pause); + stopComponents(groupId, processorRevisions, affectedComponentMap, pause); } final Set<AffectedComponentEntity> updatedEntities = components.stream() - .map(component -> AffectedComponentUtils.updateEntity(component, serviceFacade, dtoFactory, user)) + .map(component -> AffectedComponentUtils.updateEntity(component, serviceFacade, dtoFactory)) .collect(Collectors.toSet()); return updatedEntities; } @Override - public Set<AffectedComponentEntity> activateControllerServices(final URI exampleUri, final NiFiUser user, final String groupId, final Set<AffectedComponentEntity> services, + public Set<AffectedComponentEntity> activateControllerServices(final URI exampleUri, final String groupId, final Set<AffectedComponentEntity> services, final ControllerServiceState desiredState, final Pause pause) throws LifecycleManagementException { final Map<String, Revision> serviceRevisions = services.stream() @@ -83,20 +82,19 @@ public class LocalComponentLifecycle implements ComponentLifecycle { .collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity())); if (desiredState == ControllerServiceState.ENABLED) { - enableControllerServices(groupId, serviceRevisions, affectedServiceMap, user, pause); + enableControllerServices(groupId, serviceRevisions, affectedServiceMap, pause); } else { - disableControllerServices(groupId, serviceRevisions, affectedServiceMap, user, pause); + disableControllerServices(groupId, serviceRevisions, affectedServiceMap, pause); } return services.stream() - .map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId(), user)) + .map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId())) .map(dtoFactory::createAffectedComponentEntity) .collect(Collectors.toSet()); } - private void startComponents(final String processGroupId, final Map<String, Revision> componentRevisions, final Map<String, AffectedComponentEntity> affectedComponents, - final NiFiUser user, final Pause pause) { + private void startComponents(final String processGroupId, final Map<String, Revision> componentRevisions, final Map<String, AffectedComponentEntity> affectedComponents, final Pause pause) { if (componentRevisions.isEmpty()) { return; @@ -105,15 +103,14 @@ public class LocalComponentLifecycle implements ComponentLifecycle { logger.debug("Starting components with ID's {} from Process Group {}", componentRevisions.keySet(), processGroupId); serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, componentRevisions.keySet()); - serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.RUNNING, componentRevisions); + serviceFacade.scheduleComponents(processGroupId, ScheduledState.RUNNING, componentRevisions); // wait for all of the Processors to reach the desired state. We don't have to wait for other components because // Local and Remote Ports as well as funnels start immediately. waitForProcessorState(processGroupId, affectedComponents, ScheduledState.RUNNING, pause); } - private void stopComponents(final String processGroupId, final Map<String, Revision> componentRevisions, final Map<String, AffectedComponentEntity> affectedComponents, - final NiFiUser user, final Pause pause) { + private void stopComponents(final String processGroupId, final Map<String, Revision> componentRevisions, final Map<String, AffectedComponentEntity> affectedComponents, final Pause pause) { if (componentRevisions.isEmpty()) { return; @@ -122,7 +119,7 @@ public class LocalComponentLifecycle implements ComponentLifecycle { logger.debug("Stopping components with ID's {} from Process Group {}", componentRevisions.keySet(), processGroupId); serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.STOPPED, componentRevisions.keySet()); - serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.STOPPED, componentRevisions); + serviceFacade.scheduleComponents(processGroupId, ScheduledState.STOPPED, componentRevisions); // wait for all of the Processors to reach the desired state. We don't have to wait for other components because // Local and Remote Ports as well as funnels stop immediately. @@ -205,8 +202,7 @@ public class LocalComponentLifecycle implements ComponentLifecycle { } - private void enableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, - final NiFiUser user, final Pause pause) { + private void enableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, final Pause pause) { if (serviceRevisions.isEmpty()) { return; @@ -215,12 +211,11 @@ public class LocalComponentLifecycle implements ComponentLifecycle { logger.debug("Enabling Controller Services with ID's {} from Process Group {}", serviceRevisions.keySet(), processGroupId); serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, affectedServices.keySet()); - serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.ENABLED, serviceRevisions); - waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.ENABLED, pause, user); + serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions); + waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.ENABLED, pause); } - private void disableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, - final NiFiUser user, final Pause pause) { + private void disableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, final Pause pause) { if (serviceRevisions.isEmpty()) { return; @@ -229,8 +224,8 @@ public class LocalComponentLifecycle implements ComponentLifecycle { logger.debug("Disabling Controller Services with ID's {} from Process Group {}", serviceRevisions.keySet(), processGroupId); serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, affectedServices.keySet()); - serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.DISABLED, serviceRevisions); - waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.DISABLED, pause, user); + serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions); + waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.DISABLED, pause); } static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) { @@ -280,17 +275,15 @@ public class LocalComponentLifecycle implements ComponentLifecycle { * @param affectedServices all Controller Services whose state should be equal to the given desired state * @param desiredState the desired state for all services with the ID's given * @param pause the Pause that can be used to wait between polling - * @param user the user that is retrieving the controller services * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state */ - private boolean waitForControllerServiceState(final String groupId, final Map<String, AffectedComponentEntity> affectedServices, final ControllerServiceState desiredState, final Pause pause, - final NiFiUser user) { + private boolean waitForControllerServiceState(final String groupId, final Map<String, AffectedComponentEntity> affectedServices, final ControllerServiceState desiredState, final Pause pause) { logger.debug("Waiting for {} Controller Services to transition their states to {}", affectedServices.size(), desiredState); boolean continuePolling = true; while (continuePolling) { - final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true, user); + final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true); // update the affected controller services updateAffectedControllerServices(serviceEntities, affectedServices);
