This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e01d3de06 NIFI-10117: Allowing remote group ports to be synchronized 
from Stand… (#6128)
5e01d3de06 is described below

commit 5e01d3de060bdf119580469819822e3d0e99e16f
Author: Joe Gresock <[email protected]>
AuthorDate: Fri Jun 17 14:28:14 2022 -0400

    NIFI-10117: Allowing remote group ports to be synchronized from Stand… 
(#6128)
    
    NIFI-10117: Allowing remote group ports to be synchronized from 
StandardVersionedComponentSynchronizer
---
 .../StandardVersionedComponentSynchronizer.java    | 34 +++++++++++++++++++++-
 1 file changed, 33 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 04052be0a3..078d29cbdd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -2704,6 +2704,9 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
         rpg.setInputPorts(proposed.getInputPorts() == null ? 
Collections.emptySet() : proposed.getInputPorts().stream()
             .map(port -> createPortDescriptor(port, componentIdGenerator, 
rpg.getIdentifier()))
             .collect(Collectors.toSet()), false);
+
+        synchronizeRemoteGroupPorts(rpg.getInputPorts(), 
proposed.getInputPorts());
+        synchronizeRemoteGroupPorts(rpg.getOutputPorts(), 
proposed.getOutputPorts());
         rpg.setName(proposed.getName());
         rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
         rpg.setOutputPorts(proposed.getOutputPorts() == null ? 
Collections.emptySet() : proposed.getOutputPorts().stream()
@@ -2739,6 +2742,35 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
         }
     }
 
+    private void synchronizeRemoteGroupPorts(final Set<RemoteGroupPort> 
remoteGroupPorts, final Set<VersionedRemoteGroupPort> proposedPorts) {
+        final Map<String, VersionedRemoteGroupPort> inputPortsByTargetId = 
mapRemoteGroupPortsByTargetId(proposedPorts);
+        remoteGroupPorts.forEach(port -> {
+            final VersionedRemoteGroupPort proposedPort = 
inputPortsByTargetId.get(port.getTargetIdentifier());
+            if (proposedPort != null) {
+                if (proposedPort.getBatchSize() != null) {
+                    final BatchSize batchSize = proposedPort.getBatchSize();
+                    port.setBatchSize(batchSize.getSize());
+                    port.setBatchCount(batchSize.getCount());
+                    port.setBatchDuration(batchSize.getDuration());
+                }
+                if (proposedPort.isUseCompression() != null) {
+                    port.setUseCompression(proposedPort.isUseCompression());
+                }
+                if (proposedPort.getConcurrentlySchedulableTaskCount() != 
null) {
+                    
port.setMaxConcurrentTasks(proposedPort.getConcurrentlySchedulableTaskCount());
+                }
+            }
+        });
+    }
+
+    private Map<String, VersionedRemoteGroupPort> 
mapRemoteGroupPortsByTargetId(final Set<VersionedRemoteGroupPort> 
remoteGroupPorts) {
+        return remoteGroupPorts == null ? Collections.emptyMap() : 
remoteGroupPorts.stream()
+                .collect(Collectors.toMap(
+                        VersionedRemoteGroupPort::getTargetId,
+                        Function.identity()
+                ));
+    }
+
     private RemoteGroupPort getRpgInputPort(final VersionedRemoteGroupPort 
port, final RemoteProcessGroup rpg, final ComponentIdGenerator 
componentIdGenerator) {
         return getRpgPort(port, rpg, componentIdGenerator, rpg::getInputPort, 
rpg.getInputPorts());
     }
@@ -3003,7 +3035,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
 
         final Connectable source = getConnectable(destinationGroup, 
proposed.getSource());
         if (source == null) {
-            throw new IllegalArgumentException("Connection has a source with 
identifier " + proposed.getIdentifier()
+            throw new IllegalArgumentException("Connection has a source with 
identifier " + proposed.getSource().getId()
                 + " but no component could be found in the Process Group with 
a corresponding identifier");
         }
 

Reply via email to