This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bcfcf4  NIFI-7788 Created a new endpoint in 
RemoteProcessGroupResource to allow updating run statuses/transmission state of 
all remote process groups within a process group.
5bcfcf4 is described below

commit 5bcfcf42bb121e5a8324bf7443359c0461965233
Author: Tamas Palfy <[email protected]>
AuthorDate: Tue Sep 8 18:26:00 2020 +0200

    NIFI-7788 Created a new endpoint in RemoteProcessGroupResource to allow 
updating run statuses/transmission state of all remote process groups within a 
process group.
    
    When selecting run/stop on a process group/canvas/selection, it will try to 
enable/disable transmission of all involved remote process groups.
    
    NIFI-7788 Supplied same functionality missed when selecting a process group.
    NIFI-7788 Updated endpoint URL paths.
    NIFI-7788 No need to return list of remote process groups when updating en 
masse.
    NIFI-7788 Added some null checks in RemoteProcessGroupsEndpointMerger.merge.
    NIFI-7788 Fix checkstyle violation.
    
    This closes #4516.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../RemoteProcessGroupsEndpointMerger.java         |  44 ++++++---
 .../org/apache/nifi/web/NiFiServiceFacade.java     |   8 ++
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  14 +++
 .../nifi/web/api/RemoteProcessGroupResource.java   | 102 +++++++++++++++++++++
 .../nifi/web/StandardNiFiServiceFacadeTest.java    |  72 ++++++++++++++-
 .../src/main/webapp/js/nf/canvas/nf-actions.js     |  64 ++++++++++---
 6 files changed, 274 insertions(+), 30 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
index 79cfa1d..ce5be06 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
@@ -32,10 +32,17 @@ import java.util.regex.Pattern;
 
 public class RemoteProcessGroupsEndpointMerger implements 
EndpointResponseMerger {
     public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
+    public static final Pattern REMOTE_PROCESS_GROUPS_RUN_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/remote-process-groups/process-group/[a-f0-9\\-]{36}/run-status");
 
     @Override
     public boolean canHandle(final URI uri, final String method) {
-        return "GET".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
+        if ("GET".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("PUT".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUPS_RUN_STATUS_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+
+        return false;
     }
 
     @Override
@@ -47,24 +54,31 @@ public class RemoteProcessGroupsEndpointMerger implements 
EndpointResponseMerger
         final RemoteProcessGroupsEntity responseEntity = 
clientResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
         final Set<RemoteProcessGroupEntity> rpgEntities = 
responseEntity.getRemoteProcessGroups();
 
-        final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> 
entityMap = new HashMap<>();
-        for (final NodeResponse nodeResponse : successfulResponses) {
-            final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse 
== clientResponse ? responseEntity : 
nodeResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
-            final Set<RemoteProcessGroupEntity> nodeRpgEntities = 
nodeResponseEntity.getRemoteProcessGroups();
+        if (rpgEntities != null) {
+            final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> 
entityMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : successfulResponses) {
+                final RemoteProcessGroupsEntity nodeResponseEntity =
+                    nodeResponse == clientResponse
+                        ? responseEntity
+                        : 
nodeResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
+                final Set<RemoteProcessGroupEntity> nodeRpgEntities = 
nodeResponseEntity.getRemoteProcessGroups();
 
-            for (final RemoteProcessGroupEntity nodeRpgEntity : 
nodeRpgEntities) {
-                final NodeIdentifier nodeId = nodeResponse.getNodeId();
-                Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = 
entityMap.get(nodeId);
-                if (innerMap == null) {
-                    innerMap = new HashMap<>();
-                    entityMap.put(nodeRpgEntity.getId(), innerMap);
-                }
+                if (nodeRpgEntities != null) {
+                    for (final RemoteProcessGroupEntity nodeRpgEntity : 
nodeRpgEntities) {
+                        final NodeIdentifier nodeId = nodeResponse.getNodeId();
+                        Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap 
= entityMap.get(nodeId);
+                        if (innerMap == null) {
+                            innerMap = new HashMap<>();
+                            entityMap.put(nodeRpgEntity.getId(), innerMap);
+                        }
 
-                innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity);
+                        innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity);
+                    }
+                }
             }
-        }
 
-        RemoteProcessGroupsEntityMerger.mergeRemoteProcessGroups(rpgEntities, 
entityMap);
+            
RemoteProcessGroupsEntityMerger.mergeRemoteProcessGroups(rpgEntities, 
entityMap);
+        }
 
         // create a new client response
         return new NodeResponse(clientResponse, responseEntity);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 16df93e..d455836 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1310,6 +1310,14 @@ public interface NiFiServiceFacade {
      */
     StatusHistoryEntity getRemoteProcessGroupStatusHistory(String id);
 
+
+    /**
+     * Verifies that transmission state of all remote process groups within 
the specified process group can be updated.
+     * @param processGroupId The process group in which to verify remote 
process groups
+     * @param shouldTransmit The transmission state to verify for
+     */
+    void verifyUpdateRemoteProcessGroups(String processGroupId, boolean 
shouldTransmit);
+
     /**
      * Verifies the specified remote process group can be updated.
      *
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 804e2d9..5e90135 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -571,6 +571,20 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public void verifyUpdateRemoteProcessGroups(String processGroupId, boolean 
shouldTransmit) {
+        List<RemoteProcessGroup> allRemoteProcessGroups = 
processGroupDAO.getProcessGroup(processGroupId).findAllRemoteProcessGroups();
+
+        allRemoteProcessGroups.stream()
+            .map(remoteProcessGroup -> {
+                final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
+                dto.setId(remoteProcessGroup.getIdentifier());
+                dto.setTransmitting(shouldTransmit);
+                return dto;
+            })
+            .forEach(this::verifyUpdateRemoteProcessGroup);
+    }
+
+    @Override
     public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO 
remoteProcessGroupDTO) {
         // if remote group does not exist, then the update request is likely 
creating it
         // so we don't verify since it will fail
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
index 575e01b..a5ac39c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
@@ -24,10 +24,12 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.ProcessGroupAuthorizable;
 import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.OperationAuthorizable;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.ComponentStateDTO;
@@ -39,6 +41,7 @@ import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.RemotePortRunStatusEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.LongParameter;
 
@@ -58,6 +61,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.net.URI;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * RESTful endpoint for managing a Remote group.
@@ -830,6 +834,104 @@ public class RemoteProcessGroupResource extends 
ApplicationResource {
     }
 
     /**
+     * Updates the operational status for all remote process groups in the 
specified process group with the specified value.
+     *
+     * @param httpServletRequest                request
+     * @param processGroupId                    The id of the process group in 
which all remote process groups to update.
+     * @param requestRemotePortRunStatusEntity  A remotePortRunStatusEntity 
that holds the desired run status
+     * @return A response with an array of RemoteProcessGroupEntity objects.
+     */
+    @PUT
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("process-group/{id}/run-status")
+    @ApiOperation(
+            value = "Updates run status of all remote process groups in a 
process group (recursively)",
+            response = RemoteProcessGroupEntity.class,
+            authorizations = {
+                    @Authorization(value = "Write - 
/remote-process-groups/{uuid} or /operation/remote-process-groups/{uuid}")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful.")
+            }
+    )
+    public Response updateRemoteProcessGroupRunStatuses(
+            @Context HttpServletRequest httpServletRequest,
+            @ApiParam(
+                    value = "The process group id.",
+                    required = true
+            )
+            @PathParam("id") String processGroupId,
+            @ApiParam(
+                    value = "The remote process groups run status.",
+                    required = true
+            ) final RemotePortRunStatusEntity requestRemotePortRunStatusEntity
+    ) {
+        if (requestRemotePortRunStatusEntity == null) {
+            throw new IllegalArgumentException("Remote process group run 
status must be specified.");
+        }
+
+        requestRemotePortRunStatusEntity.validateState();
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.PUT, requestRemotePortRunStatusEntity);
+        } else if (isDisconnectedFromCluster()) {
+            
verifyDisconnectedNodeModification(requestRemotePortRunStatusEntity.isDisconnectedNodeAcknowledged());
+        }
+
+        // handle expects request (usually from the cluster manager)
+        final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(
+            processGroupId,
+            group -> group.findAllRemoteProcessGroups().stream()
+                .filter(remoteProcessGroup ->
+                    
requestRemotePortRunStatusEntity.getState().equals("TRANSMITTING") && 
!remoteProcessGroup.isTransmitting()
+                    || 
requestRemotePortRunStatusEntity.getState().equals("STOPPED") && 
remoteProcessGroup.isTransmitting()
+                )
+                .filter(remoteProcessGroup -> 
OperationAuthorizable.isOperationAuthorized(remoteProcessGroup, authorizer, 
NiFiUserUtils.getNiFiUser()))
+                .map(RemoteProcessGroup::getIdentifier)
+                .collect(Collectors.toSet())
+        );
+        return withWriteLock(
+            serviceFacade,
+            requestRemotePortRunStatusEntity,
+            revisions,
+            lookup -> {
+                final ProcessGroupAuthorizable processGroup = 
lookup.getProcessGroup(processGroupId);
+
+                authorizeProcessGroup(processGroup, authorizer, lookup, 
RequestAction.READ, false, false, false, false, false);
+
+                Set<Authorizable> remoteProcessGroups = 
processGroup.getEncapsulatedRemoteProcessGroups();
+                for (Authorizable remoteProcessGroup : remoteProcessGroups) {
+                    
OperationAuthorizable.authorizeOperation(remoteProcessGroup, authorizer, 
NiFiUserUtils.getNiFiUser());
+                }
+            },
+            () -> 
serviceFacade.verifyUpdateRemoteProcessGroups(processGroupId, 
shouldTransmit(requestRemotePortRunStatusEntity)),
+            (_revisions, remotePortRunStatusEntity) -> {
+                Set<RemoteProcessGroupEntity> remoteProcessGroupEntities = 
_revisions.stream()
+                    .map(revision -> {
+                        final RemoteProcessGroupEntity entity = 
serviceFacade.updateRemoteProcessGroup(revision, 
createDTOWithDesiredRunStatus(revision.getComponentId(), 
remotePortRunStatusEntity));
+                        
populateRemainingRemoteProcessGroupEntityContent(entity);
+
+                        return entity;
+                    })
+                    .collect(Collectors.toSet());
+
+                RemoteProcessGroupsEntity remoteProcessGroupsEntity = new 
RemoteProcessGroupsEntity();
+
+                Response response = 
generateOkResponse(remoteProcessGroupsEntity).build();
+
+                return response;
+            }
+        );
+    }
+
+    /**
      * Gets the state for a RemoteProcessGroup.
      *
      * @param id The id of the RemoteProcessGroup
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index 54c0b97..87d6ab5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -38,6 +38,7 @@ import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.history.History;
 import org.apache.nifi.history.HistoryQuery;
 import org.apache.nifi.nar.ExtensionManager;
@@ -50,6 +51,7 @@ import 
org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
 import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.web.api.dto.DtoFactory;
 import org.apache.nifi.web.api.dto.EntityFactory;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.action.HistoryDTO;
 import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
@@ -57,18 +59,23 @@ import org.apache.nifi.web.api.entity.ActionEntity;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.dao.ProcessGroupDAO;
+import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
 import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Answers;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.springframework.security.core.Authentication;
 import org.springframework.security.core.context.SecurityContextHolder;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.Date;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -186,7 +193,7 @@ public class StandardNiFiServiceFacadeTest {
         final ControllerFacade controllerFacade = new ControllerFacade();
         controllerFacade.setFlowController(flowController);
 
-        processGroupDAO = mock(ProcessGroupDAO.class);
+        processGroupDAO = mock(ProcessGroupDAO.class, 
Answers.RETURNS_DEEP_STUBS);
 
         serviceFacade = new StandardNiFiServiceFacade();
         serviceFacade.setAuditService(auditService);
@@ -418,4 +425,67 @@ public class StandardNiFiServiceFacadeTest {
         
assertTrue(serviceFacade.isAnyProcessGroupUnderVersionControl(groupId));
     }
 
+    @Test
+    public void testVerifyUpdateRemoteProcessGroups() throws Exception {
+        // GIVEN
+        RemoteProcessGroupDAO remoteProcessGroupDAO = 
mock(RemoteProcessGroupDAO.class);
+        serviceFacade.setRemoteProcessGroupDAO(remoteProcessGroupDAO);
+
+        String groupId = "groupId";
+        boolean shouldTransmit = true;
+
+        String remoteProcessGroupId1 = "remoteProcessGroupId1";
+        String remoteProcessGroupId2 = "remoteProcessGroupId2";
+
+        List<RemoteProcessGroup> remoteProcessGroups = Arrays.asList(
+            // Current 'transmitting' status should not influence the 
verification, which should be solely based on the 'shouldTransmitting' value
+            mockRemoteProcessGroup(remoteProcessGroupId1, true),
+            mockRemoteProcessGroup(remoteProcessGroupId2, false)
+        );
+
+        List<RemoteProcessGroupDTO> expected = Arrays.asList(
+            createRemoteProcessGroupDTO(remoteProcessGroupId1, shouldTransmit),
+            createRemoteProcessGroupDTO(remoteProcessGroupId2, shouldTransmit)
+        );
+
+        
when(processGroupDAO.getProcessGroup(groupId).findAllRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+        expected.stream()
+            .map(RemoteProcessGroupDTO::getId)
+            .forEach(remoteProcessGroupId -> 
when(remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupId)).thenReturn(true));
+
+
+        // WHEN
+        serviceFacade.verifyUpdateRemoteProcessGroups(groupId, shouldTransmit);
+
+        // THEN
+        ArgumentCaptor<RemoteProcessGroupDTO> 
remoteProcessGroupDTOArgumentCaptor = 
ArgumentCaptor.forClass(RemoteProcessGroupDTO.class);
+
+        verify(remoteProcessGroupDAO, 
times(remoteProcessGroups.size())).verifyUpdate(remoteProcessGroupDTOArgumentCaptor.capture());
+
+        List<RemoteProcessGroupDTO> actual = 
remoteProcessGroupDTOArgumentCaptor.getAllValues();
+
+        assertEquals(toMap(expected), toMap(actual));
+    }
+
+    private Map<String, Boolean> toMap(List<RemoteProcessGroupDTO> list) {
+        return 
list.stream().collect(Collectors.toMap(RemoteProcessGroupDTO::getId, 
RemoteProcessGroupDTO::isTransmitting));
+    }
+
+    private RemoteProcessGroup mockRemoteProcessGroup(String identifier, 
boolean transmitting) {
+        RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+
+        when(remoteProcessGroup.getIdentifier()).thenReturn(identifier);
+        when(remoteProcessGroup.isTransmitting()).thenReturn(transmitting);
+
+        return remoteProcessGroup;
+    }
+
+    private RemoteProcessGroupDTO createRemoteProcessGroupDTO(String id, 
boolean transmitting) {
+        RemoteProcessGroupDTO remoteProcessGroup = new RemoteProcessGroupDTO();
+
+        remoteProcessGroup.setId(id);
+        remoteProcessGroup.setTransmitting(transmitting);
+
+        return remoteProcessGroup;
+    }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index d6341c0..5ae7945 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -652,11 +652,18 @@
                     'id': nfCanvasUtils.getGroupId(),
                     'state': 'RUNNING'
                 };
-
                 updateResource(config.urls.api + '/flow/process-groups/' + 
encodeURIComponent(nfCanvasUtils.getGroupId()), 
entity).done(updateProcessGroup);
+
+                var remoteProcessGroupEntity = {
+                    'state': 'TRANSMITTING'
+                };
+                updateResource(config.urls.api + 
'/remote-process-groups/process-group/' + 
encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', 
remoteProcessGroupEntity)
+                    .done(function (response) {
+                        nfRemoteProcessGroup.set(response.remoteProcessGroups);
+                    });
             } else {
                 var componentsToStart = selection.filter(function (d) {
-                    return nfCanvasUtils.isRunnable(d3.select(this));
+                    return nfCanvasUtils.isRunnable(d3.select(this)) || 
nfCanvasUtils.canStartTransmitting(d3.select(this));
                 });
 
                 // ensure there are startable components selected
@@ -675,6 +682,12 @@
                                 'id': d.id,
                                 'state': 'RUNNING'
                             }
+                        } else if 
(nfCanvasUtils.isRemoteProcessGroup(selected)) {
+                            uri = d.uri + '/run-status';
+                            entity = {
+                                'revision': nfClient.getRevision(d),
+                                'state': 'TRANSMITTING'
+                            };
                         } else {
                             uri = d.uri + '/run-status';
                             entity = {
@@ -683,13 +696,21 @@
                             };
                         }
 
-                        startRequests.push(updateResource(uri, 
entity).done(function (response) {
-                            if (nfCanvasUtils.isProcessGroup(selected)) {
+                        if (nfCanvasUtils.isProcessGroup(selected)) {
+                            var remoteProcessGroupEntity = {
+                                'state': 'TRANSMITTING'
+                            };
+                            var startRemoteProcessGroups = 
updateResource(config.urls.api + '/remote-process-groups/process-group/' + 
encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', 
remoteProcessGroupEntity);
+                            
startRequests.push(startRemoteProcessGroups.done(function (response) {}));
+
+                            startRequests.push(updateResource(uri, 
entity).done(function (response) {
                                 
nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id);
-                            } else {
+                            }));
+                        } else {
+                            startRequests.push(updateResource(uri, 
entity).done(function (response) {
                                 
nfCanvasUtils.getComponentByType(d.type).set(response);
-                            }
-                        }));
+                            }));
+                        }
                     });
 
                     // inform Angular app once the updates have completed
@@ -755,11 +776,18 @@
                     'id': nfCanvasUtils.getGroupId(),
                     'state': 'STOPPED'
                 };
-
                 updateResource(config.urls.api + '/flow/process-groups/' + 
encodeURIComponent(nfCanvasUtils.getGroupId()), 
entity).done(updateProcessGroup);
+
+                var remoteProcessGroupEntity = {
+                    'state': 'STOPPED'
+                };
+                updateResource(config.urls.api + 
'/remote-process-groups/process-group/' + 
encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', 
remoteProcessGroupEntity)
+                    .done(function (response) {
+                        nfRemoteProcessGroup.set(response.remoteProcessGroups);
+                    });
             } else {
                 var componentsToStop = selection.filter(function (d) {
-                    return nfCanvasUtils.isStoppable(d3.select(this));
+                    return nfCanvasUtils.isStoppable(d3.select(this)) || 
nfCanvasUtils.canStopTransmitting(d3.select(this));
                 });
 
                 // ensure there are some component to stop
@@ -786,13 +814,21 @@
                             };
                         }
 
-                        stopRequests.push(updateResource(uri, 
entity).done(function (response) {
-                            if (nfCanvasUtils.isProcessGroup(selected)) {
+                        if (nfCanvasUtils.isProcessGroup(selected)) {
+                            var remoteProcessGroupEntity = {
+                                'state': 'STOPPED'
+                            };
+                            var stopRemoteProcessGroups = 
updateResource(config.urls.api + '/remote-process-groups/process-group/' + 
encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', 
remoteProcessGroupEntity);
+                            
stopRequests.push(stopRemoteProcessGroups.done(function (response) {}));
+
+                            stopRequests.push(updateResource(uri, 
entity).done(function (response) {
                                 
nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id);
-                            } else {
+                            }));
+                        } else {
+                            stopRequests.push(updateResource(uri, 
entity).done(function (response) {
                                 
nfCanvasUtils.getComponentByType(d.type).set(response);
-                            }
-                        }));
+                            }));
+                        }
                     });
 
                     // inform Angular app once the updates have completed

Reply via email to