http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java new file mode 100644 index 0000000..c41d13b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.util; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response.Status; + +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.api.ApplicationResource.ReplicationTarget; +import org.apache.nifi.web.api.dto.AffectedComponentDTO; +import org.apache.nifi.web.api.dto.DtoFactory; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServicesEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ProcessorsEntity; +import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.jersey.core.util.MultivaluedMapImpl; + +public class ClusterReplicationComponentLifecycle implements ComponentLifecycle { + private static final Logger logger = LoggerFactory.getLogger(ClusterReplicationComponentLifecycle.class); + + private ClusterCoordinator clusterCoordinator; + private RequestReplicator requestReplicator; + private NiFiServiceFacade serviceFacade; + private DtoFactory dtoFactory; + + + @Override + public Set<AffectedComponentEntity> scheduleComponents(final URI exampleUri, final NiFiUser user, final String groupId, final Set<AffectedComponentEntity> components, + final ScheduledState desiredState, final Pause pause) throws LifecycleManagementException { + + final Set<String> componentIds = components.stream() + .map(component -> component.getId()) + .collect(Collectors.toSet()); + + final Map<String, AffectedComponentEntity> componentMap = components.stream() + .collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity())); + + final Map<String, Revision> componentRevisionMap = getRevisions(groupId, componentIds); + final Map<String, RevisionDTO> componentRevisionDtoMap = componentRevisionMap.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue()))); + + final ScheduleComponentsEntity scheduleProcessorsEntity = new ScheduleComponentsEntity(); + scheduleProcessorsEntity.setComponents(componentRevisionDtoMap); + scheduleProcessorsEntity.setId(groupId); + scheduleProcessorsEntity.setState(desiredState.name()); + + URI scheduleGroupUri; + try { + scheduleGroupUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(), + exampleUri.getPort(), "/nifi-api/flow/process-groups/" + groupId, null, exampleUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + final Map<String, String> headers = new HashMap<>(); + headers.put("content-type", MediaType.APPLICATION_JSON); + + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + try { + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), user, HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse(); + } + + final int scheduleComponentStatus = clusterResponse.getStatus(); + if (scheduleComponentStatus != Status.OK.getStatusCode()) { + throw new LifecycleManagementException("Failed to transition components to a state of " + desiredState); + } + + final boolean processorsTransitioned = waitForProcessorStatus(user, exampleUri, groupId, componentMap, desiredState, pause); + + if (!processorsTransitioned) { + throw new LifecycleManagementException("Failed while waiting for components to transition to state of " + desiredState); + } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new LifecycleManagementException("Interrupted while attempting to transition components to state of " + desiredState); + } + + final Set<AffectedComponentEntity> updatedEntities = components.stream() + .map(component -> AffectedComponentUtils.updateEntity(component, serviceFacade, dtoFactory, user)) + .collect(Collectors.toSet()); + return updatedEntities; + } + + + private ReplicationTarget getReplicationTarget() { + return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR; + } + + private RequestReplicator getRequestReplicator() { + return requestReplicator; + } + + protected NodeIdentifier getClusterCoordinatorNode() { + final NodeIdentifier activeClusterCoordinator = clusterCoordinator.getElectedActiveCoordinatorNode(); + if (activeClusterCoordinator != null) { + return activeClusterCoordinator; + } + + throw new NoClusterCoordinatorException(); + } + + private Map<String, Revision> getRevisions(final String groupId, final Set<String> componentIds) { + final Set<Revision> processorRevisions = serviceFacade.getRevisionsFromGroup(groupId, group -> componentIds); + return processorRevisions.stream().collect(Collectors.toMap(revision -> revision.getComponentId(), Function.identity())); + } + + /** + * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State. + * + * @param user the user making the request + * @param groupId the ID of the Process Group to poll + * @param processorIds the ID of all Processors whose state should be equal to the given desired state + * @param desiredState the desired state for all processors with the ID's given + * @param pause the Pause that can be used to wait between polling + * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state + */ + private boolean waitForProcessorStatus(final NiFiUser user, final URI originalUri, final String groupId, final Map<String, AffectedComponentEntity> processors, + final ScheduledState desiredState, final Pause pause) throws InterruptedException { + URI groupUri; + try { + groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + final Map<String, String> headers = new HashMap<>(); + final MultivaluedMap<String, String> requestEntity = new MultivaluedMapImpl(); + + boolean continuePolling = true; + while (continuePolling) { + + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { + return false; + } + + final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class); + final Set<ProcessorEntity> processorEntities = processorsEntity.getProcessors(); + + if (isProcessorActionComplete(processorEntities, processors, desiredState)) { + logger.debug("All {} processors of interest now have the desired state of {}", processors.size(), desiredState); + return true; + } + + // Not all of the processors are in the desired state. Pause for a bit and poll again. + continuePolling = pause.pause(); + } + + return false; + } + + /** + * Extracts the response entity from the specified node response. + * + * @param nodeResponse node response + * @param clazz class + * @param <T> type of class + * @return the response entity + */ + @SuppressWarnings("unchecked") + private <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) { + T entity = (T) nodeResponse.getUpdatedEntity(); + if (entity == null) { + entity = nodeResponse.getClientResponse().getEntity(clazz); + } + return entity; + } + + + private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents, final ScheduledState desiredState) { + + final String desiredStateName = desiredState.name(); + + // update the affected processors + processorEntities.stream() + .filter(entity -> affectedComponents.containsKey(entity.getId())) + .forEach(entity -> { + final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId()); + affectedComponentEntity.setRevision(entity.getRevision()); + + // only consider update this component if the user had permissions to it + if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { + final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); + affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus()); + affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount()); + + if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { + affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); + } + } + }); + + final boolean allProcessorsMatch = processorEntities.stream() + .filter(entity -> affectedComponents.containsKey(entity.getId())) + .allMatch(entity -> { + final ProcessorStatusDTO status = entity.getStatus(); + + final String runStatus = status.getAggregateSnapshot().getRunStatus(); + final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus); + if (!stateMatches) { + return false; + } + + if (desiredState == ScheduledState.STOPPED && status.getAggregateSnapshot().getActiveThreadCount() != 0) { + return false; + } + + return true; + }); + + if (!allProcessorsMatch) { + return false; + } + + return true; + } + + + + @Override + public Set<AffectedComponentEntity> activateControllerServices(final URI originalUri, final NiFiUser user, final String groupId, final Set<AffectedComponentEntity> affectedServices, + final ControllerServiceState desiredState, final Pause pause) throws LifecycleManagementException { + + final Set<String> affectedServiceIds = affectedServices.stream() + .map(component -> component.getId()) + .collect(Collectors.toSet()); + + final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds); + final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue()))); + + final ActivateControllerServicesEntity activateServicesEntity = new ActivateControllerServicesEntity(); + activateServicesEntity.setComponents(serviceRevisionDtoMap); + activateServicesEntity.setId(groupId); + activateServicesEntity.setState(desiredState.name()); + + URI controllerServicesUri; + try { + controllerServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", null, originalUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + final Map<String, String> headers = new HashMap<>(); + headers.put("content-type", MediaType.APPLICATION_JSON); + + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + try { + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse(); + } + + final int disableServicesStatus = clusterResponse.getStatus(); + if (disableServicesStatus != Status.OK.getStatusCode()) { + throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState); + } + + final boolean serviceTransitioned = waitForControllerServiceStatus(user, originalUri, groupId, affectedServiceIds, desiredState, pause); + + if (!serviceTransitioned) { + throw new LifecycleManagementException("Failed while waiting for Controller Services to finish transitioning to a state of " + desiredState); + } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new LifecycleManagementException("Interrupted while transitioning Controller Services to a state of " + desiredState); + } + + return affectedServices.stream() + .map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId(), user)) + .map(dtoFactory::createAffectedComponentEntity) + .collect(Collectors.toSet()); + } + + /** + * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State. + * + * @param user the user making the request + * @param groupId the ID of the Process Group to poll + * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state + * @param desiredState the desired state for all services with the ID's given + * @param pause the Pause that can be used to wait between polling + * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state + */ + private boolean waitForControllerServiceStatus(final NiFiUser user, final URI originalUri, final String groupId, final Set<String> serviceIds, + final ControllerServiceState desiredState, final Pause pause) throws InterruptedException { + + URI groupUri; + try { + groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false,includeDescendantGroups=true", originalUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + final Map<String, String> headers = new HashMap<>(); + final MultivaluedMap<String, String> requestEntity = new MultivaluedMapImpl(); + + boolean continuePolling = true; + while (continuePolling) { + + // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves. + final NodeResponse clusterResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse(); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { + return false; + } + + final ControllerServicesEntity controllerServicesEntity = getResponseEntity(clusterResponse, ControllerServicesEntity.class); + final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices(); + + final Map<String, AffectedComponentEntity> affectedServices = serviceEntities.stream() + .collect(Collectors.toMap(ControllerServiceEntity::getId, dtoFactory::createAffectedComponentEntity)); + + // update the affected controller services + updateAffectedControllerServices(serviceEntities, affectedServices); + + final String desiredStateName = desiredState.name(); + final boolean allServicesMatch = serviceEntities.stream() + .map(entity -> entity.getComponent()) + .filter(service -> serviceIds.contains(service.getId())) + .map(service -> service.getState()) + .allMatch(state -> state.equals(desiredStateName)); + + if (allServicesMatch) { + logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState); + return true; + } + + // Not all of the processors are in the desired state. Pause for a bit and poll again. + continuePolling = pause.pause(); + } + + return false; + } + + + /** + * Updates the affected controller services in the specified updateRequest with the serviceEntities. + * + * @param serviceEntities service entities + * @param updateRequest update request + */ + private void updateAffectedControllerServices(final Set<ControllerServiceEntity> serviceEntities, final Map<String, AffectedComponentEntity> affectedServices) { + // update the affected components + serviceEntities.stream() + .filter(entity -> affectedServices.containsKey(entity.getId())) + .forEach(entity -> { + final AffectedComponentEntity affectedComponentEntity = affectedServices.get(entity.getId()); + affectedComponentEntity.setRevision(entity.getRevision()); + + // only consider update this component if the user had permissions to it + if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { + final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); + affectedComponent.setState(entity.getComponent().getState()); + + if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { + affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); + } + } + }); + } + + public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) { + this.clusterCoordinator = clusterCoordinator; + } + + public void setRequestReplicator(final RequestReplicator requestReplicator) { + this.requestReplicator = requestReplicator; + } + + public void setDtoFactory(final DtoFactory dtoFactory) { + this.dtoFactory = dtoFactory; + } + + public void setServiceFacade(final NiFiServiceFacade serviceFacade) { + this.serviceFacade = serviceFacade; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java new file mode 100644 index 0000000..c84b966 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.util; + +import java.net.URI; +import java.util.Set; + +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; + +public interface ComponentLifecycle { + /** + * Updates the scheduled state of all components that are given, to match the desired ScheduledState + * + * @param exampleUri an URI to use as a base for the REST API. + * @param user the user making the request + * @param groupId the ID of the process group + * @param components the components to schedule or unschedule + * @param desiredState the desired state of the components + * @param pause a pause that can be used to determine how long to wait between polling for task completion and that can also be used to cancel the operation + * + * @return the set of all AffectedComponents that are updated by the request, including the new Revisions + * + * @throws IllegalStateException if any of the components given do not have a state that can be transitioned to the given desired state + */ + Set<AffectedComponentEntity> scheduleComponents(URI exampleUri, NiFiUser user, String groupId, Set<AffectedComponentEntity> components, + ScheduledState desiredState, Pause pause) throws LifecycleManagementException; + + /** + * Updates the Controller Service State state of all controller services that are given, to match the desired ControllerServiceState + * + * @param exampleUri an URI to use as a base for the REST API + * @param user the user making the request + * @param groupId the ID of the process group + * @param services the controller services to enable or disable + * @param desiredState the desired state of the components + * @param pause a pause that can be used to determine how long to wait between polling for task completion and that can also be used to cancel the operation + * + * @return the set of all AffectedComponents that are updated by the request, including the new Revisions + * + * @throws IllegalStateException if any of the components given do not have a state that can be transitioned to the given desired state + */ + Set<AffectedComponentEntity> activateControllerServices(URI exampleUri, NiFiUser user, String groupId, Set<AffectedComponentEntity> services, + ControllerServiceState desiredState, Pause pause) throws LifecycleManagementException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LifecycleManagementException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LifecycleManagementException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LifecycleManagementException.java new file mode 100644 index 0000000..1778c6a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LifecycleManagementException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.util; + +public class LifecycleManagementException extends Exception { + + public LifecycleManagementException(String message) { + super(message); + } + + public LifecycleManagementException(Throwable cause) { + super(cause); + } + + public LifecycleManagementException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java new file mode 100644 index 0000000..22ffec7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.util; + +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.api.dto.AffectedComponentDTO; +import org.apache.nifi.web.api.dto.DtoFactory; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.revision.RevisionManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LocalComponentLifecycle implements ComponentLifecycle { + private static final Logger logger = LoggerFactory.getLogger(LocalComponentLifecycle.class); + + private NiFiServiceFacade serviceFacade; + private RevisionManager revisionManager; + private DtoFactory dtoFactory; + + @Override + public Set<AffectedComponentEntity> scheduleComponents(final URI exampleUri, final NiFiUser user, final String groupId, final Set<AffectedComponentEntity> components, + final ScheduledState desiredState, final Pause pause) throws LifecycleManagementException { + + final Map<String, Revision> processorRevisions = components.stream() + .collect(Collectors.toMap(AffectedComponentEntity::getId, entity -> revisionManager.getRevision(entity.getId()))); + + final Map<String, AffectedComponentEntity> affectedComponentMap = components.stream() + .collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity())); + + if (desiredState == ScheduledState.RUNNING) { + startComponents(groupId, processorRevisions, affectedComponentMap, user, pause); + } else { + stopComponents(groupId, processorRevisions, affectedComponentMap, user, pause); + } + + final Set<AffectedComponentEntity> updatedEntities = components.stream() + .map(component -> AffectedComponentUtils.updateEntity(component, serviceFacade, dtoFactory, user)) + .collect(Collectors.toSet()); + return updatedEntities; + } + + @Override + public Set<AffectedComponentEntity> activateControllerServices(final URI exampleUri, final NiFiUser user, final String groupId, final Set<AffectedComponentEntity> services, + final ControllerServiceState desiredState, final Pause pause) throws LifecycleManagementException { + + final Map<String, Revision> serviceRevisions = services.stream() + .collect(Collectors.toMap(AffectedComponentEntity::getId, entity -> revisionManager.getRevision(entity.getId()))); + + final Map<String, AffectedComponentEntity> affectedServiceMap = services.stream() + .collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity())); + + if (desiredState == ControllerServiceState.ENABLED) { + enableControllerServices(groupId, serviceRevisions, affectedServiceMap, user, pause); + } else { + disableControllerServices(groupId, serviceRevisions, affectedServiceMap, user, pause); + } + + return services.stream() + .map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId(), user)) + .map(dtoFactory::createAffectedComponentEntity) + .collect(Collectors.toSet()); + } + + + private void startComponents(final String processGroupId, final Map<String, Revision> componentRevisions, final Map<String, AffectedComponentEntity> affectedComponents, + final NiFiUser user, final Pause pause) { + + if (componentRevisions.isEmpty()) { + return; + } + + logger.debug("Starting components with ID's {} from Process Group {}", componentRevisions.keySet(), processGroupId); + + serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, componentRevisions.keySet()); + serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.RUNNING, componentRevisions); + + // wait for all of the Processors to reach the desired state. We don't have to wait for other components because + // Local and Remote Ports as well as funnels start immediately. + waitForProcessorState(processGroupId, affectedComponents, ScheduledState.RUNNING, pause); + } + + private void stopComponents(final String processGroupId, final Map<String, Revision> componentRevisions, final Map<String, AffectedComponentEntity> affectedComponents, + final NiFiUser user, final Pause pause) { + + if (componentRevisions.isEmpty()) { + return; + } + + logger.debug("Stopping components with ID's {} from Process Group {}", componentRevisions.keySet(), processGroupId); + + serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.STOPPED, componentRevisions.keySet()); + serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.STOPPED, componentRevisions); + + // wait for all of the Processors to reach the desired state. We don't have to wait for other components because + // Local and Remote Ports as well as funnels stop immediately. + waitForProcessorState(processGroupId, affectedComponents, ScheduledState.STOPPED, pause); + } + + /** + * Waits for all of the given Processors to reach the given Scheduled State. + * + * @return <code>true</code> if all processors have reached the desired state, false if the given {@link Pause} indicates + * to give up before all of the processors have reached the desired state + */ + private boolean waitForProcessorState(final String groupId, final Map<String, AffectedComponentEntity> affectedComponents, + final ScheduledState desiredState, final Pause pause) { + + logger.debug("Waiting for {} processors to transition their states to {}", affectedComponents.size(), desiredState); + + boolean continuePolling = true; + while (continuePolling) { + final Set<ProcessorEntity> processorEntities = serviceFacade.getProcessors(groupId, true); + + if (isProcessorActionComplete(processorEntities, affectedComponents, desiredState)) { + logger.debug("All {} processors of interest now have the desired state of {}", affectedComponents.size(), desiredState); + return true; + } + + // Not all of the processors are in the desired state. Pause for a bit and poll again. + continuePolling = pause.pause(); + } + + return false; + } + + private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents, final ScheduledState desiredState) { + + final String desiredStateName = desiredState.name(); + + // update the affected processors + processorEntities.stream() + .filter(entity -> affectedComponents.containsKey(entity.getId())) + .forEach(entity -> { + final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId()); + affectedComponentEntity.setRevision(entity.getRevision()); + + // only consider updating this component if the user had permissions to it + if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { + final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); + affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus()); + affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount()); + + if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { + affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); + } + } + }); + + final boolean allProcessorsMatch = processorEntities.stream() + .filter(entity -> affectedComponents.containsKey(entity.getId())) + .allMatch(entity -> { + final ProcessorStatusDTO status = entity.getStatus(); + + final String runStatus = status.getAggregateSnapshot().getRunStatus(); + final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus); + if (!stateMatches) { + return false; + } + + if (desiredState == ScheduledState.STOPPED && status.getAggregateSnapshot().getActiveThreadCount() != 0) { + return false; + } + + return true; + }); + + if (!allProcessorsMatch) { + return false; + } + + return true; + } + + + private void enableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, + final NiFiUser user, final Pause pause) { + + if (serviceRevisions.isEmpty()) { + return; + } + + logger.debug("Enabling Controller Services with ID's {} from Process Group {}", serviceRevisions.keySet(), processGroupId); + + serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, affectedServices.keySet()); + serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.ENABLED, serviceRevisions); + waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.ENABLED, pause, user); + } + + private void disableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, + final NiFiUser user, final Pause pause) { + + if (serviceRevisions.isEmpty()) { + return; + } + + logger.debug("Disabling Controller Services with ID's {} from Process Group {}", serviceRevisions.keySet(), processGroupId); + + serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, affectedServices.keySet()); + serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.DISABLED, serviceRevisions); + waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.DISABLED, pause, user); + } + + /** + * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State. + * + * @param groupId the ID of the Process Group to poll + * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state + * @param desiredState the desired state for all services with the ID's given + * @param pause the Pause that can be used to wait between polling + * @param user the user that is retrieving the controller services + * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state + */ + private boolean waitForControllerServiceState(final String groupId, final Map<String, AffectedComponentEntity> affectedServices, final ControllerServiceState desiredState, final Pause pause, + final NiFiUser user) { + + logger.debug("Waiting for {} Controller Services to transition their states to {}", affectedServices.size(), desiredState); + + boolean continuePolling = true; + while (continuePolling) { + final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true, user); + + // update the affected controller services + updateAffectedControllerServices(serviceEntities, affectedServices); + + final String desiredStateName = desiredState.name(); + final boolean allServicesMatch = serviceEntities.stream() + .map(entity -> entity.getComponent()) + .filter(service -> affectedServices.containsKey(service.getId())) + .map(service -> service.getState()) + .allMatch(state -> desiredStateName.equals(state)); + + + if (allServicesMatch) { + logger.debug("All {} controller services of interest now have the desired state of {}", affectedServices.size(), desiredState); + return true; + } + + // Not all of the processors are in the desired state. Pause for a bit and poll again. + continuePolling = pause.pause(); + } + + return false; + } + + + /** + * Updates the affected controller services in the specified updateRequest with the serviceEntities. + * + * @param serviceEntities service entities + * @param updateRequest update request + */ + private void updateAffectedControllerServices(final Set<ControllerServiceEntity> serviceEntities, final Map<String, AffectedComponentEntity> affectedServices) { + // update the affected components + serviceEntities.stream() + .filter(entity -> affectedServices.containsKey(entity.getId())) + .forEach(entity -> { + final AffectedComponentEntity affectedComponentEntity = affectedServices.get(entity.getId()); + affectedComponentEntity.setRevision(entity.getRevision()); + + // only consider update this component if the user had permissions to it + if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) { + final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent(); + affectedComponent.setState(entity.getComponent().getState()); + + if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) { + affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors()); + } + } + }); + } + + + public void setServiceFacade(final NiFiServiceFacade serviceFacade) { + this.serviceFacade = serviceFacade; + } + + public void setRevisionManager(final RevisionManager revisionManager) { + this.revisionManager = revisionManager; + } + + public void setDtoFactory(final DtoFactory dtoFactory) { + this.dtoFactory = dtoFactory; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 7fc33cf..48db565 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -61,6 +61,20 @@ <property name="flowController" ref="flowController"/> <property name="accessPolicyDAO" ref="policyBasedAuthorizerDAO"/> </bean> + + <bean id="clusterComponentLifecycle" class="org.apache.nifi.web.util.ClusterReplicationComponentLifecycle"> + <property name="clusterCoordinator" ref="clusterCoordinator" /> + <property name="requestReplicator" ref="requestReplicator" /> + <property name="serviceFacade" ref="serviceFacade" /> + <property name="dtoFactory" ref="dtoFactory" /> + </bean> + + <bean id="localComponentLifecycle" class="org.apache.nifi.web.util.LocalComponentLifecycle"> + <property name="serviceFacade" ref="serviceFacade" /> + <property name="revisionManager" ref="revisionManager" /> + <property name="dtoFactory" ref="dtoFactory" /> + </bean> + <!-- nifi component dao initialization --> <bean id="processGroupDAO" class="org.apache.nifi.web.dao.impl.StandardProcessGroupDAO"> @@ -166,6 +180,7 @@ <property name="heartbeatMonitor" ref="heartbeatMonitor" /> <property name="bulletinRepository" ref="bulletinRepository"/> <property name="leaderElectionManager" ref="leaderElectionManager" /> + <property name="flowRegistryClient" ref="flowRegistryClient" /> </bean> <!-- component ui extension configuration context --> @@ -286,6 +301,17 @@ <property name="flowController" ref="flowController" /> <property name="dtoFactory" ref="dtoFactory" /> </bean> + <bean id="versionsResource" class="org.apache.nifi.web.api.VersionsResource" scope="singleton"> + <property name="serviceFacade" ref="serviceFacade" /> + <property name="properties" ref="nifiProperties"/> + <property name="requestReplicator" ref="requestReplicator" /> + <property name="clusterCoordinator" ref="clusterCoordinator"/> + <property name="flowController" ref="flowController" /> + <property name="authorizer" ref="authorizer"/> + <property name="clusterComponentLifecycle" ref="clusterComponentLifecycle" /> + <property name="localComponentLifecycle" ref="localComponentLifecycle" /> + <property name="dtoFactory" ref="dtoFactory" /> + </bean> <bean id="processorResource" class="org.apache.nifi.web.api.ProcessorResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> <property name="properties" ref="nifiProperties"/> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java index 5911120..c9a87a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java @@ -102,14 +102,13 @@ public class NaiveRevisionManager implements RevisionManager { final List<Revision> revisionList = new ArrayList<>(originalClaim.getRevisions()); revisionList.sort(new RevisionComparator()); - String failedId = null; for (final Revision revision : revisionList) { final Revision currentRevision = getRevision(revision.getComponentId()); final boolean verified = revision.equals(currentRevision); if (!verified) { // Throw an Exception indicating that we failed to obtain the locks - throw new InvalidRevisionException("Invalid Revision was given for component with ID '" + failedId + "'"); + throw new InvalidRevisionException("Invalid Revision was given for component with ID '" + revision.getComponentId() + "'"); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index bc92bcd..490862c 100644 --- a/pom.xml +++ b/pom.xml @@ -110,6 +110,7 @@ <hwx.registry.version>0.3.0</hwx.registry.version> <jackson.version>2.9.1</jackson.version> <atlas.version>0.8.1</atlas.version> + <nifi.registry.version>0.0.1-SNAPSHOT</nifi.registry.version> </properties> <repositories> @@ -1630,6 +1631,16 @@ <version>1.5.0-SNAPSHOT</version> </dependency> <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-data-model</artifactId> + <version>${nifi.registry.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-flow-diff</artifactId> + <version>${nifi.registry.version}</version> + </dependency> + <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> <version>2.0.0</version>
