Repository: nifi
Updated Branches:
  refs/heads/master 336d3cf1f -> 61c6f0305


NIFI-4863: Bug fixes to the way that we handled Remote Group Ports when 
changing flow version / reverting local changes
- Everywhere that we ignore adding remote ports we should ignore removing 
remote ports as well in flow diffs

This closes #2462.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/61c6f030
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/61c6f030
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/61c6f030

Branch: refs/heads/master
Commit: 61c6f0305bc7b5c68456fc2d40e0536698821c47
Parents: 336d3cf
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Feb 9 14:09:23 2018 -0500
Committer: Bryan Bende <bbe...@apache.org>
Committed: Fri Feb 9 15:45:54 2018 -0500

----------------------------------------------------------------------
 .../nifi/groups/StandardProcessGroup.java       | 27 ++++---
 .../nifi/remote/StandardRemoteProcessGroup.java |  1 +
 .../apache/nifi/util/FlowDifferenceFilters.java |  8 +--
 .../nifi/util/TestFlowDifferenceFilters.java    |  8 +--
 .../org/apache/nifi/web/NiFiServiceFacade.java  | 12 ++++
 .../nifi/web/StandardNiFiServiceFacade.java     | 40 +++++++++--
 .../apache/nifi/web/api/VersionsResource.java   | 74 ++++++++++++++------
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  3 +-
 .../web/dao/impl/StandardProcessGroupDAO.java   | 14 ++--
 .../dao/impl/StandardRemoteProcessGroupDAO.java | 14 ++--
 10 files changed, 138 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/61c6f030/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 8b7dcd2..ae9b43f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -3270,11 +3270,6 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                     continue;
                 }
 
-                // Ignore differences for adding a remote port
-                if (FlowDifferenceFilters.isAddedRemotePort(diff)) {
-                    continue;
-                }
-
                 // If this update adds a new Controller Service, then we need 
to check if the service already exists at a higher level
                 // and if so compare our VersionedControllerService to the 
existing service.
                 if (diff.getDifferenceType() == 
DifferenceType.COMPONENT_ADDED) {
@@ -3909,9 +3904,17 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 }
 
                 final RemoteProcessGroup rpg = rpgOption.get();
-                return rpg.getInputPorts().stream()
+                final Optional<RemoteGroupPort> portByIdOption = 
rpg.getInputPorts().stream()
                     .filter(component -> 
component.getVersionedComponentId().isPresent())
                     .filter(component -> 
id.equals(component.getVersionedComponentId().get()))
+                    .findAny();
+
+                if (portByIdOption.isPresent()) {
+                    return portByIdOption.get();
+                }
+
+                return rpg.getInputPorts().stream()
+                    .filter(component -> 
connectableComponent.getName().equals(component.getName()))
                     .findAny()
                     .orElse(null);
             }
@@ -3928,9 +3931,17 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 }
 
                 final RemoteProcessGroup rpg = rpgOption.get();
-                return rpg.getOutputPorts().stream()
+                final Optional<RemoteGroupPort> portByIdOption = 
rpg.getOutputPorts().stream()
                     .filter(component -> 
component.getVersionedComponentId().isPresent())
                     .filter(component -> 
id.equals(component.getVersionedComponentId().get()))
+                    .findAny();
+
+                if (portByIdOption.isPresent()) {
+                    return portByIdOption.get();
+                }
+
+                return rpg.getOutputPorts().stream()
+                    .filter(component -> 
connectableComponent.getName().equals(component.getName()))
                     .findAny()
                     .orElse(null);
             }
@@ -4216,7 +4227,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         final FlowComparison comparison = flowComparator.compare();
         final Set<FlowDifference> differences = 
comparison.getDifferences().stream()
                 .filter(difference -> difference.getDifferenceType() != 
DifferenceType.BUNDLE_CHANGED)
-                .filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS)
+                
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
                 .collect(Collectors.toCollection(HashSet::new));
 
         LOG.debug("There are {} differences between this Local Flow and the 
Versioned Flow: {}", differences.size(), differences);

http://git-wip-us.apache.org/repos/asf/nifi/blob/61c6f030/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 0b9c6f2..c4621e6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -672,6 +672,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
                 port.setBatchDuration(descriptor.getBatchDuration());
             }
+            port.setVersionedComponentId(descriptor.getVersionedComponentId());
 
             return port;
         } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/61c6f030/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index ca48b99..4d341e9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -29,12 +29,12 @@ public class FlowDifferenceFilters {
     /**
      * Predicate that returns true if the difference is NOT a remote port 
being added, and false if it is.
      */
-    public static Predicate<FlowDifference> FILTER_ADDED_REMOTE_PORTS =  (fd) 
-> {
-        return !isAddedRemotePort(fd);
+    public static Predicate<FlowDifference> FILTER_ADDED_REMOVED_REMOTE_PORTS 
=  (fd) -> {
+        return !isAddedOrRemovedRemotePort(fd);
     };
 
-    public static boolean isAddedRemotePort(final FlowDifference fd) {
-        if (fd.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
+    public static boolean isAddedOrRemovedRemotePort(final FlowDifference fd) {
+        if (fd.getDifferenceType() == DifferenceType.COMPONENT_ADDED || 
fd.getDifferenceType() == DifferenceType.COMPONENT_REMOVED) {
             VersionedComponent component = fd.getComponentA();
             if (component == null || fd.getComponentB() instanceof 
InstantiatedVersionedComponent) {
                 component = fd.getComponentB();

http://git-wip-us.apache.org/repos/asf/nifi/blob/61c6f030/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
index ee65816..9cc8b48 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
@@ -35,7 +35,7 @@ public class TestFlowDifferenceFilters {
                 DifferenceType.COMPONENT_ADDED, null, remoteGroupPort, null, 
null, "");
 
         // predicate should return false because we don't want to include 
changes for adding a remote input port
-        
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
+        
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference));
     }
 
     @Test
@@ -47,7 +47,7 @@ public class TestFlowDifferenceFilters {
                 DifferenceType.COMPONENT_ADDED, remoteGroupPort, null, null, 
null, "");
 
         // predicate should return false because we don't want to include 
changes for adding a remote input port
-        
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
+        
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference));
     }
 
     @Test
@@ -59,7 +59,7 @@ public class TestFlowDifferenceFilters {
                 DifferenceType.COMPONENT_ADDED, null, remoteGroupPort, null, 
null, "");
 
         // predicate should return false because we don't want to include 
changes for adding a remote input port
-        
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
+        
Assert.assertFalse(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference));
     }
 
     @Test
@@ -71,6 +71,6 @@ public class TestFlowDifferenceFilters {
                 DifferenceType.COMPONENT_ADDED, null, versionedProcessor, 
null, null, "");
 
         // predicate should return true because we do want to include changes 
for adding a non-port
-        
Assert.assertTrue(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS.test(flowDifference));
+        
Assert.assertTrue(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS.test(flowDifference));
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/61c6f030/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 165af45..009b096 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1118,6 +1118,18 @@ public interface NiFiServiceFacade {
     RemoteProcessGroupEntity createRemoteProcessGroup(Revision revision, 
String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO);
 
     /**
+     * Determines whether or not the Remote Port with the given remotePortId 
is connected (i.e., there are incoming Connections if it's an Input Port
+     * or there are outgoing Connections if it's an Output Port).
+     *
+     * @param remoteProcessGroupId the ID of the Remote Process Group
+     * @param remotePortId the ID of the Port
+     * @return <code>true</code> if remote port identified is connected, 
<code>false</code> if the port is not connected
+     *
+     * @throws ResourceNotFoundException if the port cannot be found
+     */
+    boolean isRemoteGroupPortConnected(String remoteProcessGroupId, String 
remotePortId);
+
+    /**
      * Gets a remote process group.
      *
      * @param remoteProcessGroupId The id of the remote process group

http://git-wip-us.apache.org/repos/asf/nifi/blob/61c6f030/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index bf794cf..f57f628 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -1854,6 +1854,22 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public boolean isRemoteGroupPortConnected(final String 
remoteProcessGroupId, final String remotePortId) {
+        final RemoteProcessGroup rpg = 
remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
+        RemoteGroupPort port = rpg.getInputPort(remotePortId);
+        if (port != null) {
+            return port.hasIncomingConnection();
+        }
+
+        port = rpg.getOutputPort(remotePortId);
+        if (port != null) {
+            return !port.getConnections().isEmpty();
+        }
+
+        throw new ResourceNotFoundException("Could not find Port with ID " + 
remotePortId + " as a child of RemoteProcessGroup with ID " + 
remoteProcessGroupId);
+    }
+
+    @Override
     public void verifyCanAddTemplate(String groupId, String name) {
         templateDAO.verifyCanAddTemplate(name, groupId);
     }
@@ -3966,7 +3982,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         final Set<AffectedComponentEntity> affectedComponents = 
comparison.getDifferences().stream()
             .filter(difference -> difference.getDifferenceType() != 
DifferenceType.COMPONENT_ADDED) // components that are added are not components 
that will be affected in the local flow.
             .filter(difference -> difference.getDifferenceType() != 
DifferenceType.BUNDLE_CHANGED)
-            .filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS)
+            .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
             .map(difference -> {
                 final VersionedComponent localComponent = 
difference.getComponentA();
 
@@ -4004,7 +4020,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
             }
 
             // Ignore differences for adding remote ports
-            if (FlowDifferenceFilters.isAddedRemotePort(difference)) {
+            if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) {
                 continue;
             }
 
@@ -4105,12 +4121,20 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     private void mapToConnectableId(final Collection<? extends Connectable> 
connectables, final Map<String, List<Connectable>> destination) {
         for (final Connectable connectable : connectables) {
-            final Optional<String> versionedId = 
connectable.getVersionedComponentId();
-            if (!versionedId.isPresent()) {
-                continue;
+            final Optional<String> versionedIdOption = 
connectable.getVersionedComponentId();
+
+            // Determine the Versioned ID by using the ID that is assigned, if 
one is. Otherwise,
+            // we will calculate the Versioned ID. This allows us to map 
connectables that currently are not under
+            // version control. We have to do this so that if we are changing 
flow versions and have a component that is running and it does not exist
+            // in the Versioned Flow, we still need to be able to create an 
AffectedComponentDTO for it.
+            final String versionedId;
+            if (versionedIdOption.isPresent()) {
+                versionedId = versionedIdOption.get();
+            } else {
+                versionedId = 
UUID.nameUUIDFromBytes(connectable.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
             }
 
-            final List<Connectable> byVersionedId = 
destination.computeIfAbsent(versionedId.get(), key -> new ArrayList<>());
+            final List<Connectable> byVersionedId = 
destination.computeIfAbsent(versionedId, key -> new ArrayList<>());
             byVersionedId.add(connectable);
         }
     }
@@ -4128,9 +4152,11 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         final AffectedComponentDTO dto = new AffectedComponentDTO();
         dto.setId(connectable.getIdentifier());
         dto.setReferenceType(connectable.getConnectableType().name());
-        dto.setProcessGroupId(connectable.getProcessGroupIdentifier());
         dto.setState(connectable.getScheduledState().name());
 
+        final String groupId = connectable instanceof RemoteGroupPort ? 
((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() : 
connectable.getProcessGroupIdentifier();
+        dto.setProcessGroupId(groupId);
+
         entity.setComponent(dto);
         return entity;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/61c6f030/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 8028bd9..7a96ebf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -17,12 +17,22 @@
 
 package org.apache.nifi.web.api;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.Authorizer;
@@ -75,21 +85,6 @@ import org.apache.nifi.web.util.LifecycleManagementException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedHashMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -105,6 +100,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+
 @Path("/versions")
 @Api(value = "/versions", description = "Endpoint for managing version control 
for a flow")
 public class VersionsResource extends ApplicationResource {
@@ -1491,6 +1493,36 @@ public class VersionsResource extends 
ApplicationResource {
 
                 // Step 14. Restart all components
                 final Set<AffectedComponentEntity> componentsToStart = 
getUpdatedEntities(runningComponents, user);
+
+                // If there are any Remote Group Ports that are supposed to be 
started and have no connections, we want to remove those from our Set.
+                // This will happen if the Remote Group Port is transmitting 
when the version change happens but the new flow version does not have
+                // a connection to the port. In such a case, the Port still is 
included in the Updated Entities because we do not remove them
+                // when updating the flow (they are removed in the background).
+                final Set<AffectedComponentEntity> avoidStarting = new 
HashSet<>();
+                for (final AffectedComponentEntity componentEntity : 
componentsToStart) {
+                    final AffectedComponentDTO componentDto = 
componentEntity.getComponent();
+                    final String referenceType = 
componentDto.getReferenceType();
+                    if 
(!AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT.equals(referenceType)
+                        && 
!AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT.equals(referenceType)) {
+                        continue;
+                    }
+
+                    boolean startComponent;
+                    try {
+                        startComponent = 
serviceFacade.isRemoteGroupPortConnected(componentDto.getProcessGroupId(), 
componentDto.getId());
+                    } catch (final ResourceNotFoundException rnfe) {
+                        // Could occur if RPG is refreshed at just the right 
time.
+                        startComponent = false;
+                    }
+
+                    // We must add the components to avoid starting to a 
separate Set and then remove them below,
+                    // rather than removing the component here, because doing 
so would result in a ConcurrentModificationException.
+                    if (!startComponent) {
+                        avoidStarting.add(componentEntity);
+                    }
+                }
+                componentsToStart.removeAll(avoidStarting);
+
                 final CancellableTimedPause startComponentsPause = new 
CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                 asyncRequest.setCancelCallback(startComponentsPause::cancel);
                 logger.info("Restarting {} Processors", 
componentsToStart.size());

http://git-wip-us.apache.org/repos/asf/nifi/blob/61c6f030/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 2ac31fb..359b524 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1535,6 +1535,7 @@ public final class DtoFactory {
 
         final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO();
         dto.setId(port.getIdentifier());
+        dto.setGroupId(port.getRemoteProcessGroup().getIdentifier());
         dto.setTargetId(port.getTargetIdentifier());
         dto.setName(port.getName());
         dto.setComments(port.getComments());
@@ -2227,7 +2228,7 @@ public final class DtoFactory {
             }
 
             // Ignore differences for adding remote ports
-            if (FlowDifferenceFilters.isAddedRemotePort(difference)) {
+            if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) {
                 continue;
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/61c6f030/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index e1d9e69..4744324 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -167,12 +167,9 @@ public class StandardProcessGroupDAO extends ComponentDAO 
implements ProcessGrou
                         connectable.getProcessGroup().startOutputPort((Port) 
connectable);
                         break;
                     case REMOTE_INPUT_PORT:
-                        final RemoteGroupPort remoteInputPort = 
group.findRemoteGroupPort(componentId);
-                        
remoteInputPort.getRemoteProcessGroup().startTransmitting(remoteInputPort);
-                        break;
                     case REMOTE_OUTPUT_PORT:
-                        final RemoteGroupPort remoteOutputPort = 
group.findRemoteGroupPort(componentId);
-                        
remoteOutputPort.getRemoteProcessGroup().startTransmitting(remoteOutputPort);
+                        final RemoteGroupPort remotePort = 
group.findRemoteGroupPort(componentId);
+                        
remotePort.getRemoteProcessGroup().startTransmitting(remotePort);
                         break;
                 }
             } else {
@@ -188,12 +185,9 @@ public class StandardProcessGroupDAO extends ComponentDAO 
implements ProcessGrou
                         connectable.getProcessGroup().stopOutputPort((Port) 
connectable);
                         break;
                     case REMOTE_INPUT_PORT:
-                        final RemoteGroupPort remoteInputPort = 
group.findRemoteGroupPort(componentId);
-                        
remoteInputPort.getRemoteProcessGroup().stopTransmitting(remoteInputPort);
-                        break;
                     case REMOTE_OUTPUT_PORT:
-                        final RemoteGroupPort remoteOutputPort = 
group.findRemoteGroupPort(componentId);
-                        
remoteOutputPort.getRemoteProcessGroup().stopTransmitting(remoteOutputPort);
+                        final RemoteGroupPort remotePort = 
group.findRemoteGroupPort(componentId);
+                        
remotePort.getRemoteProcessGroup().stopTransmitting(remotePort);
                         break;
                 }
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/61c6f030/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index 8228a77..05a983c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -18,11 +18,6 @@ package org.apache.nifi.web.dao.impl;
 
 import static org.apache.nifi.util.StringUtils.isEmpty;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Matcher;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.controller.FlowController;
@@ -39,12 +34,14 @@ import org.apache.nifi.web.api.dto.DtoFactory;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
 
 public class StandardRemoteProcessGroupDAO extends ComponentDAO implements 
RemoteProcessGroupDAO {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(StandardRemoteProcessGroupDAO.class);
     private FlowController flowController;
 
     private RemoteProcessGroup locateRemoteProcessGroup(final String 
remoteProcessGroupId) {
@@ -129,6 +126,7 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
     /**
      * Verifies the specified remote group can be updated, if necessary.
      */
+    @SuppressWarnings("unchecked")
     private void verifyUpdate(RemoteProcessGroup remoteProcessGroup, 
RemoteProcessGroupDTO remoteProcessGroupDto) {
         // see if the remote process group can start/stop transmitting
         if (isNotNull(remoteProcessGroupDto.isTransmitting())) {

Reply via email to