NIFI-2493 This closes #798. Do not fingerprint Remote Ports' running state. 
When synchronizing remote flow with local flow, start/stop remote group ports 
as appropriate based on the inherited flow


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ed14bf22
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ed14bf22
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ed14bf22

Branch: refs/heads/master
Commit: ed14bf22e716e1d290ed03284699ae9421ed7fab
Parents: 8752d11
Author: Mark Payne <[email protected]>
Authored: Fri Aug 5 15:52:45 2016 -0400
Committer: joewitt <[email protected]>
Committed: Mon Aug 8 16:54:27 2016 -0400

----------------------------------------------------------------------
 .../controller/StandardFlowSynchronizer.java    | 50 ++++++++++++++++++++
 .../nifi/fingerprint/FingerprintFactory.java    |  3 +-
 2 files changed, 51 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ed14bf22/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 0e0c74b..d1822ef 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -772,6 +772,56 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             }
         }
 
+        // Update scheduled state of Remote Group Ports
+        final List<Element> remoteProcessGroupList = 
getChildrenByTagName(processGroupElement, "remoteProcessGroup");
+        for (final Element remoteGroupElement : remoteProcessGroupList) {
+            final RemoteProcessGroupDTO remoteGroupDto = 
FlowFromDOMFactory.getRemoteProcessGroup(remoteGroupElement, encryptor);
+            final RemoteProcessGroup rpg = 
processGroup.getRemoteProcessGroup(remoteGroupDto.getId());
+
+            // input ports
+            final List<Element> inputPortElements = 
getChildrenByTagName(remoteGroupElement, "inputPort");
+            for (final Element inputPortElement : inputPortElements) {
+                final RemoteProcessGroupPortDescriptor portDescriptor = 
FlowFromDOMFactory.getRemoteProcessGroupPort(inputPortElement);
+                final String inputPortId = portDescriptor.getId();
+                final RemoteGroupPort inputPort = 
rpg.getInputPort(inputPortId);
+                if (inputPort == null) {
+                    continue;
+                }
+
+                if (portDescriptor.isTransmitting()) {
+                    if (inputPort.getScheduledState() != 
ScheduledState.RUNNING && inputPort.getScheduledState() != 
ScheduledState.STARTING) {
+                        rpg.startTransmitting(inputPort);
+                    }
+                } else {
+                    if (inputPort.getScheduledState() != 
ScheduledState.STOPPED && inputPort.getScheduledState() != 
ScheduledState.STOPPING) {
+                        rpg.stopTransmitting(inputPort);
+                    }
+                }
+            }
+
+            // output ports
+            final List<Element> outputPortElements = 
getChildrenByTagName(remoteGroupElement, "outputPort");
+            for (final Element outputPortElement : outputPortElements) {
+                final RemoteProcessGroupPortDescriptor portDescriptor = 
FlowFromDOMFactory.getRemoteProcessGroupPort(outputPortElement);
+                final String outputPortId = portDescriptor.getId();
+                final RemoteGroupPort outputPort = 
rpg.getOutputPort(outputPortId);
+                if (outputPort == null) {
+                    continue;
+                }
+
+                if (portDescriptor.isTransmitting()) {
+                    if (outputPort.getScheduledState() != 
ScheduledState.RUNNING && outputPort.getScheduledState() != 
ScheduledState.STARTING) {
+                        rpg.startTransmitting(outputPort);
+                    }
+                } else {
+                    if (outputPort.getScheduledState() != 
ScheduledState.STOPPED && outputPort.getScheduledState() != 
ScheduledState.STOPPING) {
+                        rpg.stopTransmitting(outputPort);
+                    }
+                }
+            }
+        }
+
+
         // add labels
         final List<Element> labelNodeList = 
getChildrenByTagName(processGroupElement, "label");
         for (final Element labelElement : labelNodeList) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed14bf22/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index 26f83b5..20bdb60 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -778,7 +778,7 @@ public final class FingerprintFactory {
     }
 
     private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder 
builder, final Element remoteGroupPortElement) {
-        for (final String childName : new String[]{"id", "scheduledState", 
"maxConcurrentTasks", "useCompression"}) {
+        for (final String childName : new String[] {"id", 
"maxConcurrentTasks", "useCompression"}) {
             appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName));
         }
 
@@ -787,7 +787,6 @@ public final class FingerprintFactory {
 
     private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder 
builder, final RemoteProcessGroupPortDTO port) {
         builder.append(port.getId());
-        builder.append(Boolean.TRUE.equals(port.isTransmitting()) ? "RUNNING" 
: "STOPPED");
         builder.append(port.getConcurrentlySchedulableTaskCount());
         builder.append(port.getUseCompression());
         return builder;

Reply via email to