Repository: nifi
Updated Branches:
  refs/heads/master c1b99d584 -> 5cd8e93be


http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index 9af57b6..258af72 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -16,6 +16,13 @@
  */
 package org.apache.nifi.web.dao.impl;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Port;
@@ -23,14 +30,15 @@ import org.apache.nifi.connectable.Position;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.dao.ProcessGroupDAO;
 
-import java.util.HashSet;
-import java.util.Set;
-
 public class StandardProcessGroupDAO extends ComponentDAO implements 
ProcessGroupDAO {
 
     private FlowController flowController;
@@ -64,6 +72,10 @@ public class StandardProcessGroupDAO extends ComponentDAO 
implements ProcessGrou
     }
 
     @Override
+    public void verifyUpdate(final ProcessGroupDTO processGroup) {
+    }
+
+    @Override
     public ProcessGroup getProcessGroup(String groupId) {
         return locateProcessGroup(flowController, groupId);
     }
@@ -99,14 +111,32 @@ public class StandardProcessGroupDAO extends ComponentDAO 
implements ProcessGrou
     }
 
     @Override
-    public void scheduleComponents(final String groupId, final ScheduledState 
state, final Set<String> componentIds) {
+    public void verifyActivateControllerServices(final String groupId, final 
ControllerServiceState state, final Set<String> serviceIds) {
         final ProcessGroup group = locateProcessGroup(flowController, groupId);
 
+        group.findAllControllerServices().stream()
+            .filter(service -> serviceIds.contains(service.getIdentifier()))
+            .forEach(service -> {
+                if (state == ControllerServiceState.ENABLED) {
+                    service.verifyCanEnable();
+                } else {
+                    service.verifyCanDisable();
+                }
+            });
+    }
+
+    @Override
+    public CompletableFuture<Void> scheduleComponents(final String groupId, 
final ScheduledState state, final Set<String> componentIds) {
+        final ProcessGroup group = locateProcessGroup(flowController, groupId);
+
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+
         for (final String componentId : componentIds) {
             final Connectable connectable = 
group.findLocalConnectable(componentId);
             if (ScheduledState.RUNNING.equals(state)) {
                 if 
(ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
-                    
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
+                    final CompletableFuture<?> processorFuture = 
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
+                    future = CompletableFuture.allOf(future, processorFuture);
                 } else if 
(ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) {
                     connectable.getProcessGroup().startInputPort((Port) 
connectable);
                 } else if 
(ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) {
@@ -114,7 +144,8 @@ public class StandardProcessGroupDAO extends ComponentDAO 
implements ProcessGrou
                 }
             } else {
                 if 
(ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
-                    
connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable);
+                    final CompletableFuture<?> processorFuture = 
connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable);
+                    future = CompletableFuture.allOf(future, processorFuture);
                 } else if 
(ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) {
                     connectable.getProcessGroup().stopInputPort((Port) 
connectable);
                 } else if 
(ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) {
@@ -122,6 +153,27 @@ public class StandardProcessGroupDAO extends ComponentDAO 
implements ProcessGrou
                 }
             }
         }
+
+        return future;
+    }
+
+    @Override
+    public Future<Void> activateControllerServices(final String groupId, final 
ControllerServiceState state, final Set<String> serviceIds) {
+        final ProcessGroup group = locateProcessGroup(flowController, groupId);
+
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+        for (final String serviceId : serviceIds) {
+            final ControllerServiceNode serviceNode = 
group.findControllerService(serviceId);
+            if (ControllerServiceState.ENABLED.equals(state)) {
+                final CompletableFuture<Void> serviceFuture = 
flowController.enableControllerService(serviceNode);
+                future = CompletableFuture.allOf(future, serviceFuture);
+            } else {
+                final CompletableFuture<Void> serviceFuture = 
flowController.disableControllerService(serviceNode);
+                future = CompletableFuture.allOf(future, serviceFuture);
+            }
+        }
+
+        return future;
     }
 
     @Override
@@ -145,6 +197,22 @@ public class StandardProcessGroupDAO extends ComponentDAO 
implements ProcessGrou
     }
 
     @Override
+    public ProcessGroup updateVariableRegistry(final VariableRegistryDTO 
variableRegistry) {
+        final ProcessGroup group = locateProcessGroup(flowController, 
variableRegistry.getProcessGroupId());
+        if (group == null) {
+            throw new ResourceNotFoundException("Could not find Process Group 
with ID " + variableRegistry.getProcessGroupId());
+        }
+
+        final Map<String, String> variableMap = new HashMap<>();
+        variableRegistry.getVariables().stream() // have to use forEach here 
instead of using Collectors.toMap because value may be null
+            .map(VariableEntity::getVariable)
+            .forEach(var -> variableMap.put(var.getName(), var.getValue()));
+
+        group.setVariables(variableMap);
+        return group;
+    }
+
+    @Override
     public void verifyDelete(String groupId) {
         ProcessGroup group = locateProcessGroup(flowController, groupId);
         group.verifyCanDelete();

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/Pause.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/Pause.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/Pause.java
new file mode 100644
index 0000000..c2ef890
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/Pause.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.util;
+
+public interface Pause {
+
+    /**
+     * Waits up to given amount of time, and returns <code>true</code> if the 
action being performed
+     * should continue, <code>false</code> if the action being performed has 
been cancelled
+     *
+     * @return <code>true</code> if the action should continue, 
<code>false</code> otherwise
+     */
+    boolean pause();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 2311b07..8ee39f0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -284,6 +284,7 @@
         <property name="requestReplicator" ref="requestReplicator" />
         <property name="authorizer" ref="authorizer"/>
         <property name="flowController" ref="flowController" />
+        <property name="dtoFactory" ref="dtoFactory" />
     </bean>
     <bean id="processorResource" 
class="org.apache.nifi.web.api.ProcessorResource" scope="singleton">
         <property name="serviceFacade" ref="serviceFacade"/>

Reply via email to