http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java new file mode 100644 index 0000000..4b87b50 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.concurrent; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.web.ResourceNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AsyncRequestManager<T> implements RequestManager<T> { + private static final Logger logger = LoggerFactory.getLogger(AsyncRequestManager.class); + + private final long requestExpirationMillis; + private final int maxConcurrentRequests; + private final ConcurrentMap<String, AsynchronousWebRequest<T>> requests = new ConcurrentHashMap<>(); + + private final ExecutorService threadPool; + + + public AsyncRequestManager(final int maxConcurrentRequests, final long requestExpirationMillis, final String threadNamePrefix) { + this.requestExpirationMillis = requestExpirationMillis; + this.maxConcurrentRequests = maxConcurrentRequests; + + this.threadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS, + new ArrayBlockingQueue<Runnable>(maxConcurrentRequests), + new ThreadFactory() { + private final AtomicLong counter = new AtomicLong(0L); + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setName(threadNamePrefix + "-" + counter.incrementAndGet()); + thread.setDaemon(true); + return thread; + } + }); + + } + + private String getKey(final String type, final String request) { + return type + "/" + request; + } + + @Override + public void submitRequest(final String type, final String requestId, final AsynchronousWebRequest<T> request, final Consumer<AsynchronousWebRequest<T>> task) { + Objects.requireNonNull(type); + Objects.requireNonNull(requestId); + Objects.requireNonNull(request); + Objects.requireNonNull(task); + + // before adding to the request map, purge any old requests. Must do this by creating a List of ID's + // and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException. + final Date oneMinuteAgo = new Date(System.currentTimeMillis() - requestExpirationMillis); + final List<String> completedRequestIds = requests.entrySet().stream() + .filter(entry -> entry.getValue().isComplete()) + .filter(entry -> entry.getValue().getLastUpdated().before(oneMinuteAgo)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + + completedRequestIds.stream().forEach(id -> requests.remove(id)); + + final int requestCount = requests.size(); + if (requestCount > maxConcurrentRequests) { + throw new IllegalStateException("There are already " + requestCount + " update requests for variable registries. " + + "Cannot issue any more requests until the older ones are deleted or expire"); + } + + final String key = getKey(type, requestId); + final AsynchronousWebRequest<T> existing = this.requests.putIfAbsent(key, request); + if (existing != null) { + throw new IllegalArgumentException("A requests already exists with this ID and type"); + } + + threadPool.submit(new Runnable() { + @Override + public void run() { + try { + task.accept(request); + } catch (final Exception e) { + logger.error("Failed to perform asynchronous task", e); + request.setFailureReason("Encountered unexpected error when performing asynchronous task: " + e); + request.setLastUpdated(new Date()); + } + } + }); + } + + + @Override + public AsynchronousWebRequest<T> removeRequest(final String type, final String id, final NiFiUser user) { + Objects.requireNonNull(type); + Objects.requireNonNull(id); + Objects.requireNonNull(user); + + final String key = getKey(type, id); + final AsynchronousWebRequest<T> request = requests.get(key); + if (request == null) { + throw new ResourceNotFoundException("Could not find a Request with identifier " + id); + } + + if (!request.getUser().equals(user)) { + throw new IllegalArgumentException("Only the user that submitted the update request can delete it."); + } + + if (!request.isComplete()) { + throw new IllegalStateException("Cannot remove the request because it is not yet complete"); + } + + return requests.remove(key); + } + + @Override + public AsynchronousWebRequest<T> getRequest(final String type, final String id, final NiFiUser user) { + Objects.requireNonNull(type); + Objects.requireNonNull(id); + Objects.requireNonNull(user); + + final String key = getKey(type, id); + final AsynchronousWebRequest<T> request = requests.get(key); + if (request == null) { + throw new ResourceNotFoundException("Could not find a Request with identifier " + id); + } + + if (!request.getUser().equals(user)) { + throw new IllegalArgumentException("Only the user that submitted the update request can delete it."); + } + + return request; + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java new file mode 100644 index 0000000..d09f895 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.concurrent; + +import java.util.Date; + +import org.apache.nifi.authorization.user.NiFiUser; + +public interface AsynchronousWebRequest<T> { + + /** + * @return the ID of the process group that the request is for + */ + String getProcessGroupId(); + + /** + * @return whether or not this request has completed + */ + boolean isComplete(); + + /** + * @return the Date at which the status of this request was last updated + */ + Date getLastUpdated(); + + /** + * Updates the Date at which the status of this request was last updated + * + * @param date the date at which the status of this request was last updated + */ + void setLastUpdated(Date date); + + /** + * @return the user who submitted the request + */ + NiFiUser getUser(); + + /** + * Indicates that this request has completed, successfully or otherwise + * + * @param results the results of the request + */ + void markComplete(T results); + + /** + * Updates the request to indicate the reason that the request failed + * + * @param explanation the reason that the request failed + */ + void setFailureReason(String explanation); + + /** + * Indicates the reason that the request failed, or <code>null</code> if the request has not failed + * + * @param explanation the reason that the request failed, or <code>null</code> if the request has not failed + */ + String getFailureReason(); + + /** + * Returns the results of the request, if it completed successfully, or <code>null</code> if the request either has no completed or failed + * + * @return the results of the request, if it completed successfully, or <code>null</code> if the request either has no completed or failed + */ + T getResults(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java new file mode 100644 index 0000000..580ab47 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.concurrent; + +import java.util.function.Consumer; + +import org.apache.nifi.authorization.user.NiFiUser; + +public interface RequestManager<T> { + + /** + * Submits a request to be performed in the background + * + * @param requestType the type of request to submit. This value can be anything and is used along with the id in order to create + * a composite key for the request so that different request types may easily be managed by the RequestManager. + * @param id the ID of the request + * @param request the request + * @param task the task that should be performed in the background + * + * @throws IllegalArgumentException if a request already exists with the given ID + * @throws NullPointerException if any argument is null + */ + void submitRequest(String requestType, String id, AsynchronousWebRequest<T> request, Consumer<AsynchronousWebRequest<T>> task); + + /** + * Retrieves the request with the given ID + * + * @param requestType the type of the request being retrieved + * @param id the ID of the request + * @param user the user who is retrieving the request + * @return the request with the given ID + * + * @throws ResourceNotFoundException if no request can be found with the given ID + * @throws IllegalArgumentException if the user given is not the user that submitted the request + * @throws NullPointerException if either the ID or the user is null + */ + AsynchronousWebRequest<T> getRequest(String requestType, String id, NiFiUser user); + + /** + * Removes the request with the given ID + * + * @param requestType the type of the request being removed + * @param id the ID of the request + * @param user the user who is retrieving the request + * @return the request with the given ID + * + * @throws ResourceNotFoundException if no request can be found with the given ID + * @throws IllegalArgumentException if the user given is not the user that submitted the request + * @throws IllegalStateException if the request with the given ID is not yet complete + * @throws NullPointerException if either the ID or the user is null + */ + AsynchronousWebRequest<T> removeRequest(String requestType, String id, NiFiUser user); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java new file mode 100644 index 0000000..8ba9a58 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.concurrent; + +import java.util.Date; +import java.util.Objects; + +import org.apache.nifi.authorization.user.NiFiUser; + +public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest<T> { + private final String id; + private final String processGroupId; + private final NiFiUser user; + + private volatile boolean complete = false; + private volatile Date lastUpdated = new Date(); + private volatile String failureReason; + private volatile T results; + + public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user) { + this.id = requestId; + this.processGroupId = processGroupId; + this.user = user; + } + + public String getRequestId() { + return id; + } + + @Override + public boolean isComplete() { + return complete; + } + + @Override + public String getProcessGroupId() { + return processGroupId; + } + + @Override + public void markComplete(final T results) { + this.complete = true; + this.results = results; + this.lastUpdated = new Date(); + } + + @Override + public Date getLastUpdated() { + return lastUpdated; + } + + @Override + public void setLastUpdated(final Date date) { + this.lastUpdated = lastUpdated; + } + + @Override + public NiFiUser getUser() { + return user; + } + + @Override + public void setFailureReason(final String explanation) { + this.failureReason = Objects.requireNonNull(explanation); + this.complete = true; + this.results = null; + } + + @Override + public String getFailureReason() { + return failureReason; + } + + @Override + public T getResults() { + return results; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 58abdea..489e590 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,6 +16,33 @@ */ package org.apache.nifi.web.api.dto; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -112,6 +139,15 @@ import org.apache.nifi.provenance.lineage.LineageEdge; import org.apache.nifi.provenance.lineage.LineageNode; import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode; import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedLabel; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedPort; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteProcessGroup; import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; import org.apache.nifi.remote.RemoteGroupPort; @@ -160,42 +196,19 @@ import org.apache.nifi.web.api.entity.AllowableValueEntity; import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ComponentReferenceEntity; import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity; import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.TenantEntity; import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.revision.RevisionManager; -import javax.ws.rs.WebApplicationException; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TimeZone; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - public final class DtoFactory { @SuppressWarnings("rawtypes") @@ -616,6 +629,7 @@ public final class DtoFactory { dto.setzIndex(connection.getZIndex()); dto.setSource(createConnectableDto(connection.getSource())); dto.setDestination(createConnectableDto(connection.getDestination())); + dto.setVersionedComponentId(connection.getVersionedComponentId().orElse(null)); dto.setBackPressureObjectThreshold(connection.getFlowFileQueue().getBackPressureObjectThreshold()); dto.setBackPressureDataSizeThreshold(connection.getFlowFileQueue().getBackPressureDataSizeThreshold()); @@ -667,6 +681,7 @@ public final class DtoFactory { dto.setId(connectable.getIdentifier()); dto.setName(isAuthorized ? connectable.getName() : connectable.getIdentifier()); dto.setType(connectable.getConnectableType().name()); + dto.setVersionedComponentId(connectable.getVersionedComponentId().orElse(null)); if (connectable instanceof RemoteGroupPort) { final RemoteGroupPort remoteGroupPort = (RemoteGroupPort) connectable; @@ -708,6 +723,7 @@ public final class DtoFactory { dto.setWidth(label.getSize().getWidth()); dto.setLabel(label.getValue()); dto.setParentGroupId(label.getProcessGroup().getIdentifier()); + dto.setVersionedComponentId(label.getVersionedComponentId().orElse(null)); return dto; } @@ -824,6 +840,7 @@ public final class DtoFactory { dto.setId(funnel.getIdentifier()); dto.setPosition(createPositionDto(funnel.getPosition())); dto.setParentGroupId(funnel.getProcessGroup().getIdentifier()); + dto.setVersionedComponentId(funnel.getVersionedComponentId().orElse(null)); return dto; } @@ -1228,6 +1245,7 @@ public final class DtoFactory { dto.setParentGroupId(port.getProcessGroup().getIdentifier()); dto.setState(port.getScheduledState().toString()); dto.setType(port.getConnectableType().name()); + dto.setVersionedComponentId(port.getVersionedComponentId().orElse(null)); // if this port is on the root group, determine if its actually connected to another nifi if (port instanceof RootGroupPort) { @@ -1354,6 +1372,7 @@ public final class DtoFactory { dto.setDeprecated(controllerServiceNode.isDeprecated()); dto.setExtensionMissing(controllerServiceNode.isExtensionMissing()); dto.setMultipleVersionsAvailable(compatibleBundles.size() > 1); + dto.setVersionedComponentId(controllerServiceNode.getVersionedComponentId().orElse(null)); // sort a copy of the properties final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() { @@ -1511,6 +1530,7 @@ public final class DtoFactory { dto.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks()); dto.setUseCompression(port.isUseCompression()); dto.setExists(port.getTargetExists()); + dto.setVersionedComponentId(port.getVersionedComponentId().orElse(null)); final BatchSettingsDTO batchDTO = new BatchSettingsDTO(); batchDTO.setCount(port.getBatchCount()); @@ -1619,6 +1639,7 @@ public final class DtoFactory { dto.setInactiveRemoteInputPortCount(inactiveRemoteInputPortCount); dto.setActiveRemoteOutputPortCount(activeRemoteOutputPortCount); dto.setInactiveRemoteOutputPortCount(inactiveRemoteOutputPortCount); + dto.setVersionedComponentId(group.getVersionedComponentId().orElse(null)); final ProcessGroupCounts counts = group.getCounts(); if (counts != null) { @@ -1679,6 +1700,7 @@ public final class DtoFactory { dto.setId(componentAuthorizable.getIdentifier()); dto.setParentGroupId(componentAuthorizable.getProcessGroupIdentifier()); dto.setName(authorizable.getResource().getName()); + return dto; } @@ -1738,6 +1760,81 @@ public final class DtoFactory { return dto; } + public AffectedComponentEntity createAffectedComponentEntity(final ProcessorEntity processorEntity) { + if (processorEntity == null) { + return null; + } + + final AffectedComponentEntity component = new AffectedComponentEntity(); + component.setBulletins(processorEntity.getBulletins()); + component.setId(processorEntity.getId()); + component.setPermissions(processorEntity.getPermissions()); + component.setPosition(processorEntity.getPosition()); + component.setRevision(processorEntity.getRevision()); + component.setUri(processorEntity.getUri()); + + final ProcessorDTO processorDto = processorEntity.getComponent(); + final AffectedComponentDTO componentDto = new AffectedComponentDTO(); + componentDto.setId(processorDto.getId()); + componentDto.setName(processorDto.getName()); + componentDto.setProcessGroupId(processorDto.getParentGroupId()); + componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + componentDto.setState(processorDto.getState()); + componentDto.setValidationErrors(processorDto.getValidationErrors()); + component.setComponent(componentDto); + + return component; + } + + public AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceEntity serviceEntity) { + if (serviceEntity == null) { + return null; + } + + final AffectedComponentEntity component = new AffectedComponentEntity(); + component.setBulletins(serviceEntity.getBulletins()); + component.setId(serviceEntity.getId()); + component.setPermissions(serviceEntity.getPermissions()); + component.setPosition(serviceEntity.getPosition()); + component.setRevision(serviceEntity.getRevision()); + component.setUri(serviceEntity.getUri()); + + final ControllerServiceDTO serviceDto = serviceEntity.getComponent(); + final AffectedComponentDTO componentDto = new AffectedComponentDTO(); + componentDto.setId(serviceDto.getId()); + componentDto.setName(serviceDto.getName()); + componentDto.setProcessGroupId(serviceDto.getParentGroupId()); + componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + componentDto.setState(serviceDto.getState()); + componentDto.setValidationErrors(serviceDto.getValidationErrors()); + component.setComponent(componentDto); + + return component; + } + + public AffectedComponentEntity createAffectedComponentEntity(final RemoteProcessGroupPortDTO remotePortDto, final String referenceType, final RemoteProcessGroupEntity rpgEntity) { + if (remotePortDto == null) { + return null; + } + + final AffectedComponentEntity component = new AffectedComponentEntity(); + component.setId(remotePortDto.getId()); + component.setPermissions(rpgEntity.getPermissions()); + component.setRevision(rpgEntity.getRevision()); + component.setUri(rpgEntity.getUri()); + + final AffectedComponentDTO componentDto = new AffectedComponentDTO(); + componentDto.setId(remotePortDto.getId()); + componentDto.setName(remotePortDto.getName()); + componentDto.setProcessGroupId(remotePortDto.getGroupId()); + componentDto.setReferenceType(referenceType); + componentDto.setState(remotePortDto.isTransmitting() ? "Running" : "Stopped"); + component.setComponent(componentDto); + + return component; + } + + public AffectedComponentDTO createAffectedComponentDto(final ConfiguredComponent component) { final AffectedComponentDTO dto = new AffectedComponentDTO(); dto.setId(component.getIdentifier()); @@ -2047,6 +2144,8 @@ public final class DtoFactory { dto.setPosition(createPositionDto(group.getPosition())); dto.setComments(group.getComments()); dto.setName(group.getName()); + dto.setVersionedComponentId(group.getVersionedComponentId().orElse(null)); + dto.setVersionControlInformation(createVersionControlInformationDto(group.getVersionControlInformation())); final Map<String, String> variables = group.getVariableRegistry().getVariableMap().entrySet().stream() .collect(Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue())); @@ -2070,6 +2169,68 @@ public final class DtoFactory { return dto; } + public VersionControlInformationDTO createVersionControlInformationDto(final VersionControlInformation versionControlInfo) { + if (versionControlInfo == null) { + return null; + } + + final VersionControlInformationDTO dto = new VersionControlInformationDTO(); + dto.setRegistryId(versionControlInfo.getRegistryIdentifier()); + dto.setBucketId(versionControlInfo.getBucketIdentifier()); + dto.setFlowId(versionControlInfo.getFlowIdentifier()); + dto.setVersion(versionControlInfo.getVersion()); + dto.setCurrent(versionControlInfo.getCurrent().orElse(null)); + dto.setModified(versionControlInfo.getModified().orElse(null)); + return dto; + } + + public Map<String, String> createVersionControlComponentMappingDto(final InstantiatedVersionedProcessGroup group) { + final Map<String, String> mapping = new HashMap<>(); + + mapping.put(group.getInstanceId(), group.getIdentifier()); + group.getProcessors().stream() + .map(proc -> (InstantiatedVersionedProcessor) proc) + .forEach(proc -> mapping.put(proc.getInstanceId(), proc.getIdentifier())); + group.getInputPorts().stream() + .map(port -> (InstantiatedVersionedPort) port) + .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + group.getOutputPorts().stream() + .map(port -> (InstantiatedVersionedPort) port) + .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + group.getControllerServices().stream() + .map(service -> (InstantiatedVersionedControllerService) service) + .forEach(service -> mapping.put(service.getInstanceId(), service.getIdentifier())); + group.getLabels().stream() + .map(label -> (InstantiatedVersionedLabel) label) + .forEach(label -> mapping.put(label.getInstanceId(), label.getIdentifier())); + group.getConnections().stream() + .map(conn -> (InstantiatedVersionedConnection) conn) + .forEach(conn -> mapping.put(conn.getInstanceId(), conn.getIdentifier())); + group.getRemoteProcessGroups().stream() + .map(rpg -> (InstantiatedVersionedRemoteProcessGroup) rpg) + .forEach(rpg -> { + mapping.put(rpg.getInstanceId(), rpg.getIdentifier()); + + rpg.getInputPorts().stream() + .map(port -> (InstantiatedVersionedRemoteGroupPort) port) + .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + + rpg.getOutputPorts().stream() + .map(port -> (InstantiatedVersionedRemoteGroupPort) port) + .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + }); + + group.getProcessGroups().stream() + .map(child -> (InstantiatedVersionedProcessGroup) child) + .forEach(child -> { + final Map<String, String> childMapping = createVersionControlComponentMappingDto(child); + mapping.putAll(childMapping); + }); + + return mapping; + } + + /** * Creates a ProcessGroupContentDTO from the specified ProcessGroup. * @@ -2418,6 +2579,7 @@ public final class DtoFactory { dto.setDeprecated(node.isDeprecated()); dto.setExtensionMissing(node.isExtensionMissing()); dto.setMultipleVersionsAvailable(compatibleBundles.size() > 1); + dto.setVersionedComponentId(node.getVersionedComponentId().orElse(null)); dto.setType(node.getCanonicalClassName()); dto.setBundle(createBundleDto(bundleCoordinate)); @@ -2989,6 +3151,7 @@ public final class DtoFactory { copy.setPosition(original.getPosition()); copy.setWidth(original.getWidth()); copy.setHeight(original.getHeight()); + copy.setVersionedComponentId(original.getVersionedComponentId()); return copy; } @@ -3012,6 +3175,7 @@ public final class DtoFactory { copy.setMultipleVersionsAvailable(original.getMultipleVersionsAvailable()); copy.setPersistsState(original.getPersistsState()); copy.setValidationErrors(copy(original.getValidationErrors())); + copy.setVersionedComponentId(original.getVersionedComponentId()); return copy; } @@ -3020,6 +3184,7 @@ public final class DtoFactory { copy.setId(original.getId()); copy.setParentGroupId(original.getParentGroupId()); copy.setPosition(original.getPosition()); + copy.setVersionedComponentId(original.getVersionedComponentId()); return copy; } @@ -3088,6 +3253,7 @@ public final class DtoFactory { copy.setExtensionMissing(original.getExtensionMissing()); copy.setMultipleVersionsAvailable(original.getMultipleVersionsAvailable()); copy.setValidationErrors(copy(original.getValidationErrors())); + copy.setVersionedComponentId(original.getVersionedComponentId()); return copy; } @@ -3132,6 +3298,7 @@ public final class DtoFactory { copy.setzIndex(original.getzIndex()); copy.setLabelIndex(original.getLabelIndex()); copy.setBends(copy(original.getBends())); + copy.setVersionedComponentId(original.getVersionedComponentId()); return copy; } @@ -3164,6 +3331,7 @@ public final class DtoFactory { copy.setUserAccessControl(copy(original.getUserAccessControl())); copy.setGroupAccessControl(copy(original.getGroupAccessControl())); copy.setValidationErrors(copy(original.getValidationErrors())); + copy.setVersionedComponentId(original.getVersionedComponentId()); return copy; } @@ -3180,6 +3348,8 @@ public final class DtoFactory { copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount()); copy.setUseCompression(original.getUseCompression()); copy.setExists(original.getExists()); + copy.setVersionedComponentId(original.getVersionedComponentId()); + final BatchSettingsDTO batchOrg = original.getBatchSettings(); if (batchOrg != null) { final BatchSettingsDTO batchCopy = new BatchSettingsDTO(); @@ -3199,8 +3369,10 @@ public final class DtoFactory { copy.setInputPortCount(original.getInputPortCount()); copy.setInvalidCount(original.getInvalidCount()); copy.setName(original.getName()); + copy.setVersionControlInformation(copy(original.getVersionControlInformation())); copy.setOutputPortCount(original.getOutputPortCount()); copy.setParentGroupId(original.getParentGroupId()); + copy.setVersionedComponentId(original.getVersionedComponentId()); copy.setRunningCount(original.getRunningCount()); copy.setStoppedCount(original.getStoppedCount()); @@ -3215,6 +3387,21 @@ public final class DtoFactory { return copy; } + public VersionControlInformationDTO copy(final VersionControlInformationDTO original) { + if (original == null) { + return null; + } + + final VersionControlInformationDTO copy = new VersionControlInformationDTO(); + copy.setRegistryId(original.getRegistryId()); + copy.setBucketId(original.getBucketId()); + copy.setFlowId(original.getFlowId()); + copy.setVersion(original.getVersion()); + copy.setCurrent(original.getCurrent()); + copy.setModified(original.getModified()); + return copy; + } + public RemoteProcessGroupDTO copy(final RemoteProcessGroupDTO original) { final RemoteProcessGroupContentsDTO originalContents = original.getContents(); final RemoteProcessGroupContentsDTO copyContents = new RemoteProcessGroupContentsDTO(); @@ -3256,6 +3443,7 @@ public final class DtoFactory { copy.setProxyUser(original.getProxyUser()); copy.setProxyPassword(original.getProxyPassword()); copy.setLocalNetworkInterface(original.getLocalNetworkInterface()); + copy.setVersionedComponentId(original.getVersionedComponentId()); copy.setContents(copyContents); @@ -3268,6 +3456,7 @@ public final class DtoFactory { connectable.setId(port.getId()); connectable.setName(port.getName()); connectable.setType(type.name()); + connectable.setVersionedComponentId(port.getVersionedComponentId()); return connectable; } @@ -3277,6 +3466,7 @@ public final class DtoFactory { connectable.setId(processor.getId()); connectable.setName(processor.getName()); connectable.setType(ConnectableType.PROCESSOR.name()); + connectable.setVersionedComponentId(processor.getVersionedComponentId()); return connectable; } @@ -3285,6 +3475,7 @@ public final class DtoFactory { connectable.setGroupId(funnel.getParentGroupId()); connectable.setId(funnel.getId()); connectable.setType(ConnectableType.FUNNEL.name()); + connectable.setVersionedComponentId(funnel.getVersionedComponentId()); return connectable; } @@ -3294,6 +3485,7 @@ public final class DtoFactory { connectable.setId(remoteGroupPort.getId()); connectable.setName(remoteGroupPort.getName()); connectable.setType(type.name()); + connectable.setVersionedComponentId(connectable.getVersionedComponentId()); return connectable; } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java index 16781c6..dd8d67f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java @@ -67,6 +67,7 @@ import org.apache.nifi.web.api.entity.TenantEntity; import org.apache.nifi.web.api.entity.UserEntity; import org.apache.nifi.web.api.entity.UserGroupEntity; import org.apache.nifi.web.api.entity.VariableRegistryEntity; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import java.util.Date; import java.util.List; @@ -537,4 +538,11 @@ public final class EntityFactory { } return entity; } + + public VersionControlInformationEntity createVersionControlInformationEntity(final VersionControlInformationDTO dto, final RevisionDTO processGroupRevision) { + final VersionControlInformationEntity entity = new VersionControlInformationEntity(); + entity.setVersionControlInformation(dto); + entity.setProcessGroupRevision(processGroupRevision); + return entity; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index a4e8000..615f00b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -1599,6 +1599,7 @@ public class ControllerFacade implements Authorizable { final List<String> matches = new ArrayList<>(); addIfAppropriate(searchStr, port.getIdentifier(), "Id", matches); + addIfAppropriate(searchStr, port.getVersionedComponentId().orElse(null), "Version Control ID", matches); addIfAppropriate(searchStr, port.getName(), "Name", matches); addIfAppropriate(searchStr, port.getComments(), "Comments", matches); @@ -1649,6 +1650,7 @@ public class ControllerFacade implements Authorizable { final Processor processor = procNode.getProcessor(); addIfAppropriate(searchStr, procNode.getIdentifier(), "Id", matches); + addIfAppropriate(searchStr, procNode.getVersionedComponentId().orElse(null), "Version Control ID", matches); addIfAppropriate(searchStr, procNode.getName(), "Name", matches); addIfAppropriate(searchStr, procNode.getComments(), "Comments", matches); @@ -1753,6 +1755,7 @@ public class ControllerFacade implements Authorizable { } addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches); + addIfAppropriate(searchStr, group.getVersionedComponentId().orElse(null), "Version Control ID", matches); addIfAppropriate(searchStr, group.getName(), "Name", matches); addIfAppropriate(searchStr, group.getComments(), "Comments", matches); @@ -1783,6 +1786,7 @@ public class ControllerFacade implements Authorizable { // search id and name addIfAppropriate(searchStr, connection.getIdentifier(), "Id", matches); + addIfAppropriate(searchStr, connection.getVersionedComponentId().orElse(null), "Version Control ID", matches); addIfAppropriate(searchStr, connection.getName(), "Name", matches); // search relationships @@ -1864,6 +1868,7 @@ public class ControllerFacade implements Authorizable { private ComponentSearchResultDTO search(final String searchStr, final RemoteProcessGroup group) { final List<String> matches = new ArrayList<>(); addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches); + addIfAppropriate(searchStr, group.getVersionedComponentId().orElse(null), "Version Control ID", matches); addIfAppropriate(searchStr, group.getName(), "Name", matches); addIfAppropriate(searchStr, group.getComments(), "Comments", matches); addIfAppropriate(searchStr, group.getTargetUris(), "URLs", matches); @@ -1889,6 +1894,7 @@ public class ControllerFacade implements Authorizable { private ComponentSearchResultDTO search(final String searchStr, final Funnel funnel) { final List<String> matches = new ArrayList<>(); addIfAppropriate(searchStr, funnel.getIdentifier(), "Id", matches); + addIfAppropriate(searchStr, funnel.getVersionedComponentId().orElse(null), "Version Control ID", matches); if (matches.isEmpty()) { return null; http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java index d7ca806..5f4dba5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java @@ -16,14 +16,17 @@ */ package org.apache.nifi.web.dao; +import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; public interface ProcessGroupDAO { @@ -104,6 +107,27 @@ public interface ProcessGroupDAO { ProcessGroup updateProcessGroup(ProcessGroupDTO processGroup); /** + * Updates the process group so that it matches the proposed flow + * + * @param groupId the ID of the process group + * @param proposedSnapshot Flow the new version of the flow + * @param versionControlInformation the new Version Control Information + * @param the seed value to use for generating ID's for new components + * @return the process group + */ + ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed, + boolean verifyNotModified); + + /** + * Applies the given Version Control Information to the Process Group + * + * @param versionControlInformation the Version Control Information to apply + * @param versionedComponentMapping a mapping of Component ID to Versioned Component ID + * @return the Process Group + */ + ProcessGroup updateVersionControlInformation(VersionControlInformationDTO versionControlInformation, Map<String, String> versionedComponentMapping); + + /** * Updates the specified variable registry * * @param variableRegistry the Variable Registry http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index ec584de..6fa316d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -16,15 +16,14 @@ */ package org.apache.nifi.web.dao.impl; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.FlowController; @@ -33,9 +32,14 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.registry.flow.StandardVersionControlInformation; +import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.dao.ProcessGroupDAO; @@ -90,24 +94,30 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou public void verifyScheduleComponents(final String groupId, final ScheduledState state,final Set<String> componentIds) { final ProcessGroup group = locateProcessGroup(flowController, groupId); - final Set<Connectable> connectables = new HashSet<>(componentIds.size()); for (final String componentId : componentIds) { final Connectable connectable = group.findLocalConnectable(componentId); if (connectable == null) { - throw new ResourceNotFoundException("Unable to find component with id " + componentId); - } + final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId); + if (remotePort == null) { + throw new ResourceNotFoundException("Unable to find component with id " + componentId); + } - connectables.add(connectable); - } + if (ScheduledState.RUNNING.equals(state)) { + remotePort.verifyCanStart(); + } else { + remotePort.verifyCanStop(); + } - // verify as appropriate - connectables.forEach(connectable -> { + continue; + } + + // verify as appropriate if (ScheduledState.RUNNING.equals(state)) { group.verifyCanStart(connectable); } else { group.verifyCanStop(connectable); } - }); + } } @Override @@ -134,22 +144,46 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou for (final String componentId : componentIds) { final Connectable connectable = group.findLocalConnectable(componentId); if (ScheduledState.RUNNING.equals(state)) { - if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) { - final CompletableFuture<?> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true); - future = CompletableFuture.allOf(future, processorFuture); - } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) { - connectable.getProcessGroup().startInputPort((Port) connectable); - } else if (ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) { - connectable.getProcessGroup().startOutputPort((Port) connectable); + switch (connectable.getConnectableType()) { + case PROCESSOR: + final CompletableFuture<?> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true); + future = CompletableFuture.allOf(future, processorFuture); + break; + case INPUT_PORT: + connectable.getProcessGroup().startInputPort((Port) connectable); + break; + case OUTPUT_PORT: + connectable.getProcessGroup().startOutputPort((Port) connectable); + break; + case REMOTE_INPUT_PORT: + final RemoteGroupPort remoteInputPort = group.findRemoteGroupPort(componentId); + remoteInputPort.getRemoteProcessGroup().startTransmitting(remoteInputPort); + break; + case REMOTE_OUTPUT_PORT: + final RemoteGroupPort remoteOutputPort = group.findRemoteGroupPort(componentId); + remoteOutputPort.getRemoteProcessGroup().startTransmitting(remoteOutputPort); + break; } } else { - if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) { - final CompletableFuture<?> processorFuture = connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable); - future = CompletableFuture.allOf(future, processorFuture); - } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) { - connectable.getProcessGroup().stopInputPort((Port) connectable); - } else if (ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) { - connectable.getProcessGroup().stopOutputPort((Port) connectable); + switch (connectable.getConnectableType()) { + case PROCESSOR: + final CompletableFuture<?> processorFuture = connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable); + future = CompletableFuture.allOf(future, processorFuture); + break; + case INPUT_PORT: + connectable.getProcessGroup().stopInputPort((Port) connectable); + break; + case OUTPUT_PORT: + connectable.getProcessGroup().stopOutputPort((Port) connectable); + break; + case REMOTE_INPUT_PORT: + final RemoteGroupPort remoteInputPort = group.findRemoteGroupPort(componentId); + remoteInputPort.getRemoteProcessGroup().stopTransmitting(remoteInputPort); + break; + case REMOTE_OUTPUT_PORT: + final RemoteGroupPort remoteOutputPort = group.findRemoteGroupPort(componentId); + remoteOutputPort.getRemoteProcessGroup().stopTransmitting(remoteOutputPort); + break; } } } @@ -197,6 +231,41 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override + public ProcessGroup updateVersionControlInformation(final VersionControlInformationDTO versionControlInformation, final Map<String, String> versionedComponentMapping) { + final String groupId = versionControlInformation.getGroupId(); + final ProcessGroup group = locateProcessGroup(flowController, groupId); + + final String registryId = versionControlInformation.getRegistryId(); + final String bucketId = versionControlInformation.getBucketId(); + final String flowId = versionControlInformation.getFlowId(); + final int version = versionControlInformation.getVersion(); + + final VersionControlInformation vci = new StandardVersionControlInformation(registryId, bucketId, flowId, version, null, false, true); + group.setVersionControlInformation(vci, versionedComponentMapping); + + return group; + } + + @Override + public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation, + final String componentIdSeed, final boolean verifyNotModified) { + final ProcessGroup group = locateProcessGroup(flowController, groupId); + group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified); + + final StandardVersionControlInformation svci = new StandardVersionControlInformation( + versionControlInformation.getRegistryId(), + versionControlInformation.getBucketId(), + versionControlInformation.getFlowId(), + versionControlInformation.getVersion(), + proposedSnapshot.getFlowContents(), + versionControlInformation.getModified(), + versionControlInformation.getCurrent()); + + group.setVersionControlInformation(svci, Collections.emptyMap()); + return group; + } + + @Override public ProcessGroup updateVariableRegistry(final VariableRegistryDTO variableRegistry) { final ProcessGroup group = locateProcessGroup(flowController, variableRegistry.getProcessGroupId()); if (group == null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java new file mode 100644 index 0000000..7fdaf56 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.util; + +import java.util.Optional; + +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.api.dto.AffectedComponentDTO; +import org.apache.nifi.web.api.dto.DtoFactory; +import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.entity.AffectedComponentEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; + +public class AffectedComponentUtils { + + public static AffectedComponentEntity updateEntity(final AffectedComponentEntity componentEntity, final NiFiServiceFacade serviceFacade, + final DtoFactory dtoFactory, final NiFiUser user) { + + switch (componentEntity.getComponent().getReferenceType()) { + case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR: + final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user); + return dtoFactory.createAffectedComponentEntity(procEntity); + case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT: { + final RemoteProcessGroupEntity remoteGroupEntity = serviceFacade.getRemoteProcessGroup(componentEntity.getComponent().getProcessGroupId(), user); + final RemoteProcessGroupContentsDTO remoteGroupContents = remoteGroupEntity.getComponent().getContents(); + final Optional<RemoteProcessGroupPortDTO> portDtoOption = remoteGroupContents.getInputPorts().stream() + .filter(port -> port.getId().equals(componentEntity.getId())) + .findFirst(); + + if (portDtoOption.isPresent()) { + final RemoteProcessGroupPortDTO portDto = portDtoOption.get(); + return dtoFactory.createAffectedComponentEntity(portDto, AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT, remoteGroupEntity); + } + break; + } + case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT: { + final RemoteProcessGroupEntity remoteGroupEntity = serviceFacade.getRemoteProcessGroup(componentEntity.getComponent().getProcessGroupId(), user); + final RemoteProcessGroupContentsDTO remoteGroupContents = remoteGroupEntity.getComponent().getContents(); + final Optional<RemoteProcessGroupPortDTO> portDtoOption = remoteGroupContents.getOutputPorts().stream() + .filter(port -> port.getId().equals(componentEntity.getId())) + .findFirst(); + + if (portDtoOption.isPresent()) { + final RemoteProcessGroupPortDTO portDto = portDtoOption.get(); + return dtoFactory.createAffectedComponentEntity(portDto, AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT, remoteGroupEntity); + } + break; + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java new file mode 100644 index 0000000..a6efb71 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.util; + +import java.util.concurrent.TimeUnit; + +public class CancellableTimedPause implements Pause { + private final long expirationNanoTime; + private final long pauseNanos; + private volatile boolean cancelled = false; + + public CancellableTimedPause(final long pauseTime, final long expirationTime, final TimeUnit timeUnit) { + final long expirationNanos = TimeUnit.NANOSECONDS.convert(expirationTime, timeUnit); + expirationNanoTime = System.nanoTime() + expirationNanos; + pauseNanos = Math.max(1L, TimeUnit.NANOSECONDS.convert(pauseTime, timeUnit)); + } + + public void cancel() { + cancelled = true; + } + + @Override + public boolean pause() { + if (cancelled) { + return false; + } + + long sysTime = System.nanoTime(); + final long maxWaitTime = System.nanoTime() + pauseNanos; + while (sysTime < maxWaitTime) { + try { + TimeUnit.NANOSECONDS.wait(pauseNanos); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return false; + } + + sysTime = System.nanoTime(); + } + + return sysTime < expirationNanoTime && !cancelled; + } + +}
