Repository: nifi Updated Branches: refs/heads/master c1b99d584 -> 5cd8e93be
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index 9af57b6..258af72 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -16,6 +16,13 @@ */ package org.apache.nifi.web.dao.impl; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Port; @@ -23,14 +30,15 @@ import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.VariableRegistryDTO; +import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.dao.ProcessGroupDAO; -import java.util.HashSet; -import java.util.Set; - public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGroupDAO { private FlowController flowController; @@ -64,6 +72,10 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override + public void verifyUpdate(final ProcessGroupDTO processGroup) { + } + + @Override public ProcessGroup getProcessGroup(String groupId) { return locateProcessGroup(flowController, groupId); } @@ -99,14 +111,32 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override - public void scheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) { + public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) { final ProcessGroup group = locateProcessGroup(flowController, groupId); + group.findAllControllerServices().stream() + .filter(service -> serviceIds.contains(service.getIdentifier())) + .forEach(service -> { + if (state == ControllerServiceState.ENABLED) { + service.verifyCanEnable(); + } else { + service.verifyCanDisable(); + } + }); + } + + @Override + public CompletableFuture<Void> scheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) { + final ProcessGroup group = locateProcessGroup(flowController, groupId); + + CompletableFuture<Void> future = CompletableFuture.completedFuture(null); + for (final String componentId : componentIds) { final Connectable connectable = group.findLocalConnectable(componentId); if (ScheduledState.RUNNING.equals(state)) { if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) { - connectable.getProcessGroup().startProcessor((ProcessorNode) connectable); + final CompletableFuture<?> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable); + future = CompletableFuture.allOf(future, processorFuture); } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) { connectable.getProcessGroup().startInputPort((Port) connectable); } else if (ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) { @@ -114,7 +144,8 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } } else { if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) { - connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable); + final CompletableFuture<?> processorFuture = connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable); + future = CompletableFuture.allOf(future, processorFuture); } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) { connectable.getProcessGroup().stopInputPort((Port) connectable); } else if (ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) { @@ -122,6 +153,27 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } } } + + return future; + } + + @Override + public Future<Void> activateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) { + final ProcessGroup group = locateProcessGroup(flowController, groupId); + + CompletableFuture<Void> future = CompletableFuture.completedFuture(null); + for (final String serviceId : serviceIds) { + final ControllerServiceNode serviceNode = group.findControllerService(serviceId); + if (ControllerServiceState.ENABLED.equals(state)) { + final CompletableFuture<Void> serviceFuture = flowController.enableControllerService(serviceNode); + future = CompletableFuture.allOf(future, serviceFuture); + } else { + final CompletableFuture<Void> serviceFuture = flowController.disableControllerService(serviceNode); + future = CompletableFuture.allOf(future, serviceFuture); + } + } + + return future; } @Override @@ -145,6 +197,22 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override + public ProcessGroup updateVariableRegistry(final VariableRegistryDTO variableRegistry) { + final ProcessGroup group = locateProcessGroup(flowController, variableRegistry.getProcessGroupId()); + if (group == null) { + throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistry.getProcessGroupId()); + } + + final Map<String, String> variableMap = new HashMap<>(); + variableRegistry.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null + .map(VariableEntity::getVariable) + .forEach(var -> variableMap.put(var.getName(), var.getValue())); + + group.setVariables(variableMap); + return group; + } + + @Override public void verifyDelete(String groupId) { ProcessGroup group = locateProcessGroup(flowController, groupId); group.verifyCanDelete(); http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/Pause.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/Pause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/Pause.java new file mode 100644 index 0000000..c2ef890 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/Pause.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.util; + +public interface Pause { + + /** + * Waits up to given amount of time, and returns <code>true</code> if the action being performed + * should continue, <code>false</code> if the action being performed has been cancelled + * + * @return <code>true</code> if the action should continue, <code>false</code> otherwise + */ + boolean pause(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 2311b07..8ee39f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -284,6 +284,7 @@ <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> <property name="flowController" ref="flowController" /> + <property name="dtoFactory" ref="dtoFactory" /> </bean> <bean id="processorResource" class="org.apache.nifi.web.api.ProcessorResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/>
