This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5e01d3de06 NIFI-10117: Allowing remote group ports to be synchronized
from Stand… (#6128)
5e01d3de06 is described below
commit 5e01d3de060bdf119580469819822e3d0e99e16f
Author: Joe Gresock <[email protected]>
AuthorDate: Fri Jun 17 14:28:14 2022 -0400
NIFI-10117: Allowing remote group ports to be synchronized from Stand…
(#6128)
NIFI-10117: Allowing remote group ports to be synchronized from
StandardVersionedComponentSynchronizer
---
.../StandardVersionedComponentSynchronizer.java | 34 +++++++++++++++++++++-
1 file changed, 33 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 04052be0a3..078d29cbdd 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -2704,6 +2704,9 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
rpg.setInputPorts(proposed.getInputPorts() == null ?
Collections.emptySet() : proposed.getInputPorts().stream()
.map(port -> createPortDescriptor(port, componentIdGenerator,
rpg.getIdentifier()))
.collect(Collectors.toSet()), false);
+
+ synchronizeRemoteGroupPorts(rpg.getInputPorts(),
proposed.getInputPorts());
+ synchronizeRemoteGroupPorts(rpg.getOutputPorts(),
proposed.getOutputPorts());
rpg.setName(proposed.getName());
rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
rpg.setOutputPorts(proposed.getOutputPorts() == null ?
Collections.emptySet() : proposed.getOutputPorts().stream()
@@ -2739,6 +2742,35 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
}
}
+ private void synchronizeRemoteGroupPorts(final Set<RemoteGroupPort>
remoteGroupPorts, final Set<VersionedRemoteGroupPort> proposedPorts) {
+ final Map<String, VersionedRemoteGroupPort> inputPortsByTargetId =
mapRemoteGroupPortsByTargetId(proposedPorts);
+ remoteGroupPorts.forEach(port -> {
+ final VersionedRemoteGroupPort proposedPort =
inputPortsByTargetId.get(port.getTargetIdentifier());
+ if (proposedPort != null) {
+ if (proposedPort.getBatchSize() != null) {
+ final BatchSize batchSize = proposedPort.getBatchSize();
+ port.setBatchSize(batchSize.getSize());
+ port.setBatchCount(batchSize.getCount());
+ port.setBatchDuration(batchSize.getDuration());
+ }
+ if (proposedPort.isUseCompression() != null) {
+ port.setUseCompression(proposedPort.isUseCompression());
+ }
+ if (proposedPort.getConcurrentlySchedulableTaskCount() !=
null) {
+
port.setMaxConcurrentTasks(proposedPort.getConcurrentlySchedulableTaskCount());
+ }
+ }
+ });
+ }
+
+ private Map<String, VersionedRemoteGroupPort>
mapRemoteGroupPortsByTargetId(final Set<VersionedRemoteGroupPort>
remoteGroupPorts) {
+ return remoteGroupPorts == null ? Collections.emptyMap() :
remoteGroupPorts.stream()
+ .collect(Collectors.toMap(
+ VersionedRemoteGroupPort::getTargetId,
+ Function.identity()
+ ));
+ }
+
private RemoteGroupPort getRpgInputPort(final VersionedRemoteGroupPort
port, final RemoteProcessGroup rpg, final ComponentIdGenerator
componentIdGenerator) {
return getRpgPort(port, rpg, componentIdGenerator, rpg::getInputPort,
rpg.getInputPorts());
}
@@ -3003,7 +3035,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
final Connectable source = getConnectable(destinationGroup,
proposed.getSource());
if (source == null) {
- throw new IllegalArgumentException("Connection has a source with
identifier " + proposed.getIdentifier()
+ throw new IllegalArgumentException("Connection has a source with
identifier " + proposed.getSource().getId()
+ " but no component could be found in the Process Group with
a corresponding identifier");
}