NIFI-4436: More intelligently flag a ProcessGroup to indicate whether or not it has any local modifications compared to Versioned Flow - Bug fixes - Updated to include status of a Versioned Process Group to include VersionedFlowState and explanation
Signed-off-by: Matt Gilman <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fdef5b56 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fdef5b56 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fdef5b56 Branch: refs/heads/master Commit: fdef5b560544a8da33068b4acb6d4404fe193ed9 Parents: d34fb5e Author: Mark Payne <[email protected]> Authored: Tue Nov 28 12:33:00 2017 -0500 Committer: Bryan Bende <[email protected]> Committed: Mon Jan 8 12:44:54 2018 -0500 ---------------------------------------------------------------------- .../api/dto/VersionControlInformationDTO.java | 22 ++ .../service/ControllerServiceNode.java | 3 +- .../org/apache/nifi/groups/ProcessGroup.java | 13 +- .../apache/nifi/groups/RemoteProcessGroup.java | 9 +- .../flow/VersionControlInformation.java | 5 + .../nifi/registry/flow/VersionedFlowState.java | 52 +++ .../nifi/registry/flow/VersionedFlowStatus.java | 31 ++ .../apache/nifi/controller/FlowController.java | 76 ++-- .../controller/StandardFlowSynchronizer.java | 5 +- .../nifi/groups/StandardProcessGroup.java | 382 +++++++++++++++++-- .../groups/StandardVersionedFlowStatus.java | 50 +++ .../flow/StandardVersionControlInformation.java | 17 +- .../flow/mapping/NiFiRegistryDtoMapper.java | 328 ---------------- .../flow/mapping/NiFiRegistryFlowMapper.java | 97 +++-- .../nifi/remote/StandardRemoteProcessGroup.java | 126 ++---- .../service/mock/MockProcessGroup.java | 6 +- .../nifi/web/StandardNiFiServiceFacade.java | 31 +- .../nifi/web/api/ProcessGroupResource.java | 2 +- .../org/apache/nifi/web/api/dto/DtoFactory.java | 29 +- .../nifi/web/controller/ControllerFacade.java | 5 + .../dao/impl/StandardControllerServiceDAO.java | 20 +- .../nifi/web/dao/impl/StandardFunnelDAO.java | 2 + .../nifi/web/dao/impl/StandardInputPortDAO.java | 1 + .../nifi/web/dao/impl/StandardLabelDAO.java | 1 + .../web/dao/impl/StandardOutputPortDAO.java | 1 + .../web/dao/impl/StandardProcessGroupDAO.java | 13 +- .../nifi/web/dao/impl/StandardProcessorDAO.java | 1 + .../dao/impl/StandardRemoteProcessGroupDAO.java | 5 +- .../nifi/web/util/AffectedComponentUtils.java | 4 + .../ClusterReplicationComponentLifecycle.java | 6 +- 30 files changed, 751 insertions(+), 592 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java index c31a957..944b10a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java @@ -34,6 +34,8 @@ public class VersionControlInformationDTO { private Integer version; private Boolean modified; private Boolean current; + private String state; + private String stateExplanation; @ApiModelProperty("The ID of the Process Group that is under version control") public String getGroupId() { @@ -135,4 +137,24 @@ public class VersionControlInformationDTO { public void setCurrent(Boolean current) { this.current = current; } + + @ApiModelProperty(readOnly = true, + value = "The current state of the Process Group, as it relates to the Versioned Flow", + allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE") + public String getState() { + return state; + } + + public void setState(final String state) { + this.state = state; + } + + @ApiModelProperty(readOnly = true, value = "Explanation of why the group is in the specified state") + public String getStateExplanation() { + return stateExplanation; + } + + public void setStateExplanation(String explanation) { + this.stateExplanation = explanation; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index 2f28963..2219d6d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.service; +import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; @@ -27,7 +28,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -public interface ControllerServiceNode extends ConfiguredComponent, VersionedComponent { +public interface ControllerServiceNode extends ConfiguredComponent, ConfigurableComponent, VersionedComponent { /** * @return the Process Group that this Controller Service belongs to, or <code>null</code> if the Controller Service http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index d81b7d3..17131dd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -462,11 +462,11 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi /** * @param id of the Controller Service - * @return the Controller Service with the given ID, if it exists as a child or - * descendant of this ProcessGroup. This performs a recursive search of all - * descendant ProcessGroups + * @param includeDescendantGroups whether or not to include descendant process groups + * @param includeAncestorGroups whether or not to include ancestor process groups + * @return the Controller Service with the given ID */ - ControllerServiceNode findControllerService(String id); + ControllerServiceNode findControllerService(String id, boolean includeDescendantGroups, boolean includeAncestorGroups); /** * @return a List of all Controller Services contained within this ProcessGroup and any child Process Groups @@ -976,4 +976,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @param flowRegistry the Flow Registry to synchronize with */ void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry); + + /** + * Called whenever a component within this group or the group itself is modified + */ + void onComponentModified(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 0dd6070..7d92246 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -61,9 +61,9 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, void setName(String name); - void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports); + void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports, boolean pruneUnusedPorts); - void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports); + void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports, boolean pruneUnusedPorts); Set<RemoteGroupPort> getInputPorts(); @@ -216,11 +216,6 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, void reinitialize(boolean isClustered); /** - * Removes all non existent ports from this RemoteProcessGroup. - */ - void removeAllNonExistentPorts(); - - /** * Removes a port that no longer exists on the remote instance from this * RemoteProcessGroup * http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java index b54a1c9..1f65a19 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java @@ -77,6 +77,11 @@ public interface VersionControlInformation { boolean isCurrent(); /** + * @return the current status of the Process Group as it relates to the associated Versioned Flow. + */ + VersionedFlowStatus getStatus(); + + /** * @return the snapshot of the flow that was synchronized with the Flow Registry */ VersionedProcessGroup getFlowSnapshot(); http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java new file mode 100644 index 0000000..d20a13f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +public enum VersionedFlowState { + + /** + * We are unable to communicate with the Flow Registry in order to determine the appropriate state + */ + SYNC_FAILURE, + + /** + * This Process Group (or a child/descendant Process Group that is not itself under Version Control) + * is on the latest version of the Versioned Flow, but is different than the Versioned Flow that is + * stored in the Flow Registry. + */ + LOCALLY_MODIFIED, + + /** + * This Process Group has not been modified since it was last synchronized with the Flow Registry, but + * the Flow Registry has a newer version of the flow than what is contained in this Process Group. + */ + STALE, + + /** + * This Process Group (or a child/descendant Process Group that is not itself under Version Control) + * has been modified since it was last synchronized with the Flow Registry, and the Flow Registry has + * a newer version of the flow than what is contained in this Process Group. + */ + LOCALLY_MODIFIED_AND_STALE, + + /** + * This Process Group and all child/descendant Process Groups are on the latest version of the flow in + * the Flow Registry and have no local modifications. + */ + UP_TO_DATE; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java new file mode 100644 index 0000000..9b58d9a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +public interface VersionedFlowStatus { + + /** + * @return the current state of the versioned process group + */ + VersionedFlowState getState(); + + /** + * @return an explanation of why the process group is in the state that it is in. + */ + String getStateExplanation(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 3909387..2afa9dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,6 +16,39 @@ */ package org.apache.nifi.controller; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; + import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -169,7 +202,6 @@ import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedProcessGroup; -import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.remote.HttpRemoteSiteListener; @@ -225,38 +257,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent { @@ -1983,14 +1983,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (remoteGroupDTO.getContents() != null) { final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents(); - // ensure there input ports + // ensure there are input ports if (contents.getInputPorts() != null) { - remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts())); + remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()), false); } // ensure there are output ports if (contents.getOutputPorts() != null) { - remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts())); + remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()), false); } } @@ -2035,12 +2035,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R instantiateSnippet(childGroup, childTemplateDTO, false); if (groupDTO.getVersionControlInformation() != null) { - final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(childGroup, getFlowRegistryClient(), false); - final VersionControlInformation vci = StandardVersionControlInformation.Builder .fromDto(groupDTO.getVersionControlInformation()) - .flowSnapshot(versionedGroup) .build(); childGroup.setVersionControlInformation(vci, Collections.emptyMap()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 71a587c..28d9b79 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -931,6 +931,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final Label label = controller.createLabel(labelDTO.getId(), labelDTO.getLabel()); label.setStyle(labelDTO.getStyle()); label.setPosition(new Position(labelDTO.getPosition().getX(), labelDTO.getPosition().getY())); + label.setVersionedComponentId(labelDTO.getVersionedComponentId()); if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) { label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight())); } @@ -1327,13 +1328,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "inputPort")) { inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement)); } - remoteGroup.setInputPorts(inputPorts); + remoteGroup.setInputPorts(inputPorts, false); final Set<RemoteProcessGroupPortDescriptor> outputPorts = new HashSet<>(); for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "outputPort")) { outputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement)); } - remoteGroup.setOutputPorts(outputPorts); + remoteGroup.setOutputPorts(outputPorts, false); processGroup.addRemoteProcessGroup(remoteGroup); for (final RemoteProcessGroupPortDescriptor remoteGroupPortDTO : outputPorts) { http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 9a14464..4b186a9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -83,11 +83,14 @@ import org.apache.nifi.registry.flow.VersionedControllerService; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedFlowStatus; import org.apache.nifi.registry.flow.VersionedFunnel; import org.apache.nifi.registry.flow.VersionedLabel; import org.apache.nifi.registry.flow.VersionedPort; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.VersionedPropertyDescriptor; import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; @@ -166,6 +169,8 @@ public final class StandardProcessGroup implements ProcessGroup { private final Map<String, Template> templates = new HashMap<>(); private final StringEncryptor encryptor; private final MutableVariableRegistry variableRegistry; + private final AtomicReference<StandardVersionedFlowStatus> flowStatus = new AtomicReference<>( + new StandardVersionedFlowStatus(null, "Not yet synchronized with Flow Registry", null)); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -494,6 +499,7 @@ public final class StandardProcessGroup implements ProcessGroup { port.setProcessGroup(this); inputPorts.put(requireNonNull(port).getIdentifier(), port); flowController.onInputPortAdded(port); + onComponentModified(); } finally { writeLock.unlock(); } @@ -528,6 +534,8 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group"); } + onComponentModified(); + flowController.onInputPortRemoved(port); LOG.info("Input Port {} removed from flow", port); } finally { @@ -575,6 +583,7 @@ public final class StandardProcessGroup implements ProcessGroup { port.setProcessGroup(this); outputPorts.put(port.getIdentifier(), port); flowController.onOutputPortAdded(port); + onComponentModified(); } finally { writeLock.unlock(); } @@ -600,6 +609,8 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group"); } + onComponentModified(); + flowController.onOutputPortRemoved(port); LOG.info("Output Port {} removed from flow", port); } finally { @@ -640,6 +651,7 @@ public final class StandardProcessGroup implements ProcessGroup { processGroups.put(Objects.requireNonNull(group).getIdentifier(), group); flowController.onProcessGroupAdded(group); + onComponentModified(); } finally { writeLock.unlock(); } @@ -679,6 +691,8 @@ public final class StandardProcessGroup implements ProcessGroup { removeComponents(group); processGroups.remove(group.getIdentifier()); + onComponentModified(); + flowController.onProcessGroupRemoved(group); LOG.info("{} removed from flow", group); } finally { @@ -734,6 +748,7 @@ public final class StandardProcessGroup implements ProcessGroup { remoteGroup.setProcessGroup(this); remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup); + onComponentModified(); } finally { writeLock.unlock(); } @@ -767,6 +782,8 @@ public final class StandardProcessGroup implements ProcessGroup { } } + onComponentModified(); + for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) { // must copy to avoid a concurrent modification final Set<Connection> copy = new HashSet<>(port.getConnections()); @@ -802,6 +819,7 @@ public final class StandardProcessGroup implements ProcessGroup { processor.getVariableRegistry().setParent(getVariableRegistry()); processors.put(processorId, processor); flowController.onProcessorAdded(processor); + onComponentModified(); } finally { writeLock.unlock(); } @@ -843,6 +861,8 @@ public final class StandardProcessGroup implements ProcessGroup { } processors.remove(id); + onComponentModified(); + flowController.onProcessorRemoved(processor); LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers(); @@ -912,6 +932,7 @@ public final class StandardProcessGroup implements ProcessGroup { writeLock.lock(); try { connections.put(connection.getIdentifier(), connection); + onComponentModified(); connection.setProcessGroup(this); } finally { writeLock.unlock(); @@ -983,6 +1004,7 @@ public final class StandardProcessGroup implements ProcessGroup { } connections.put(connection.getIdentifier(), connection); flowController.onConnectionAdded(connection); + onComponentModified(); } finally { writeLock.unlock(); } @@ -1042,6 +1064,8 @@ public final class StandardProcessGroup implements ProcessGroup { // remove the connection from our map connections.remove(connection.getIdentifier()); LOG.info("{} removed from flow", connection); + onComponentModified(); + flowController.onConnectionRemoved(connection); } finally { writeLock.unlock(); @@ -1109,6 +1133,7 @@ public final class StandardProcessGroup implements ProcessGroup { label.setProcessGroup(this); labels.put(label.getIdentifier(), label); + onComponentModified(); } finally { writeLock.unlock(); } @@ -1123,6 +1148,7 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException(label + " is not a member of this Process Group."); } + onComponentModified(); LOG.info("Label with ID {} removed from flow", label.getIdentifier()); } finally { writeLock.unlock(); @@ -1828,6 +1854,8 @@ public final class StandardProcessGroup implements ProcessGroup { if (autoStart) { startFunnel(funnel); } + + onComponentModified(); } finally { writeLock.unlock(); } @@ -1859,18 +1887,43 @@ public final class StandardProcessGroup implements ProcessGroup { @Override - public ControllerServiceNode findControllerService(final String id) { - return findControllerService(id, this); + public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) { + ControllerServiceNode serviceNode; + if (includeDescendants) { + serviceNode = findDescendantControllerService(id, this); + } else { + serviceNode = getControllerService(id); + } + + if (serviceNode == null && includeAncestors) { + serviceNode = findAncestorControllerService(id, getParent()); + } + + return serviceNode; + } + + private ControllerServiceNode findAncestorControllerService(final String id, final ProcessGroup start) { + if (start == null) { + return null; + } + + final ControllerServiceNode serviceNode = start.getControllerService(id); + if (serviceNode != null) { + return serviceNode; + } + + final ProcessGroup parent = start.getParent(); + return findAncestorControllerService(id, parent); } - private ControllerServiceNode findControllerService(final String id, final ProcessGroup start) { + private ControllerServiceNode findDescendantControllerService(final String id, final ProcessGroup start) { ControllerServiceNode service = start.getControllerService(id); if (service != null) { return service; } for (final ProcessGroup group : start.getProcessGroups()) { - service = findControllerService(id, group); + service = findDescendantControllerService(id, group); if (service != null) { return service; } @@ -1916,6 +1969,8 @@ public final class StandardProcessGroup implements ProcessGroup { } funnels.remove(funnel.getIdentifier()); + onComponentModified(); + flowController.onFunnelRemoved(funnel); LOG.info("{} removed from flow", funnel); } finally { @@ -1947,6 +2002,7 @@ public final class StandardProcessGroup implements ProcessGroup { service.getVariableRegistry().setParent(getVariableRegistry()); this.controllerServices.put(service.getIdentifier(), service); LOG.info("{} added to {}", service, this); + onComponentModified(); } finally { writeLock.unlock(); } @@ -2010,6 +2066,21 @@ public final class StandardProcessGroup implements ProcessGroup { } controllerServices.remove(service.getIdentifier()); + onComponentModified(); + + // For any component that references this Controller Service, find the component's Process Group + // and notify the Process Group that a component has been modified. This way, we know to re-calculate + // whether or not the Process Group has local modifications. + service.getReferences().getReferencingComponents().stream() + .map(ConfiguredComponent::getProcessGroupIdentifier) + .filter(id -> !id.equals(getIdentifier())) + .forEach(groupId -> { + final ProcessGroup descendant = findProcessGroup(groupId); + if (descendant != null) { + descendant.onComponentModified(); + } + }); + flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier()); removed = true; @@ -2043,6 +2114,7 @@ public final class StandardProcessGroup implements ProcessGroup { templates.put(id, template); template.setProcessGroup(this); LOG.info("{} added to {}", template, this); + onComponentModified(); } finally { writeLock.unlock(); } @@ -2112,6 +2184,8 @@ public final class StandardProcessGroup implements ProcessGroup { } templates.remove(template.getIdentifier()); + onComponentModified(); + LOG.info("{} removed from flow", template); } finally { writeLock.unlock(); @@ -2172,6 +2246,8 @@ public final class StandardProcessGroup implements ProcessGroup { toRemove.verifyCanDelete(true); } + onComponentModified(); + for (final String id : connectionIdsToRemove) { removeConnection(connections.get(id)); } @@ -2224,6 +2300,8 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException("Cannot move Ports into the root group"); } + onComponentModified(); + for (final String id : getKeys(snippet.getInputPorts())) { destination.addInputPort(inputPorts.remove(id)); } @@ -2845,6 +2923,34 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override + public void onComponentModified() { + // We no longer know if or how the Process Group has changed, so the next time that we + // get the local modifications, we must re-calculate it. We cannot simply assume that + // the flow was modified now, because if a Processor Property changed from 'A' to 'B', + // then back to 'A', then we have to know that it was not modified. So we set it to null + // to indicate that we must calculate the local modifications. + final StandardVersionControlInformation svci = this.versionControlInfo.get(); + if (svci == null) { + // This group is not under version control directly. Notify parent. + final ProcessGroup parentGroup = parent.get(); + if (parentGroup != null) { + parentGroup.onComponentModified(); + } + } + + clearFlowDifferences(); + } + + private void clearFlowDifferences() { + boolean updated = false; + while (!updated) { + final StandardVersionedFlowStatus status = flowStatus.get(); + final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), null); + updated = flowStatus.compareAndSet(status, updatedStatus); + } + } + + @Override public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map<String, String> versionedComponentIds) { final StandardVersionControlInformation svci = new StandardVersionControlInformation( versionControlInformation.getRegistryIdentifier(), @@ -2854,16 +2960,63 @@ public final class StandardProcessGroup implements ProcessGroup { versionControlInformation.getVersion(), stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true), versionControlInformation.isModified(), - versionControlInformation.isCurrent()) { + versionControlInformation.isCurrent(), + versionControlInformation.getStatus()) { @Override public boolean isModified() { - final Set<FlowDifference> differences = StandardProcessGroup.this.getModifications(); - if (differences == null) { - return false; + boolean updated = false; + while (true) { + final StandardVersionedFlowStatus status = flowStatus.get(); + Set<FlowDifference> differences = status.getCurrentDifferences(); + if (differences == null) { + differences = getModifications(); + if (differences == null) { + return false; + } + + final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), differences); + updated = flowStatus.compareAndSet(status, updatedStatus); + + if (updated) { + return !differences.isEmpty(); + } + + continue; + } + + return !differences.isEmpty(); + } + } + + @Override + public VersionedFlowStatus getStatus() { + // If current state is a sync failure, then + final StandardVersionedFlowStatus status = flowStatus.get(); + final VersionedFlowState state = status.getState(); + if (state == VersionedFlowState.SYNC_FAILURE) { + return status; } - return !differences.isEmpty(); + final boolean modified = isModified(); + if (!modified) { + final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get(); + if (vci.getFlowSnapshot() == null) { + return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry", null); + } + } + + final boolean stale = !isCurrent(); + + if (modified && stale) { + return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE, "Local changes have been made and a newer version of this flow is available", null); + } else if (modified) { + return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED, "Local changes have been made", null); + } else if (stale) { + return new StandardVersionedFlowStatus(VersionedFlowState.STALE, "A newer version of this flow is available", null); + } else { + return new StandardVersionedFlowStatus(VersionedFlowState.UP_TO_DATE, "Flow version is current", null); + } } }; @@ -2875,6 +3028,7 @@ public final class StandardProcessGroup implements ProcessGroup { try { updateVersionedComponentIds(this, versionedComponentIds); this.versionControlInfo.set(svci); + clearFlowDifferences(); } finally { writeLock.unlock(); } @@ -2901,6 +3055,7 @@ public final class StandardProcessGroup implements ProcessGroup { copy.setProcessors(processGroup.getProcessors()); copy.setRemoteProcessGroups(processGroup.getRemoteProcessGroups()); copy.setVariables(processGroup.getVariables()); + copy.setLabels(processGroup.getLabels()); final Set<VersionedProcessGroup> copyChildren = new HashSet<>(); @@ -2944,8 +3099,22 @@ public final class StandardProcessGroup implements ProcessGroup { } applyVersionedComponentIds(processGroup, versionedComponentIds::get); + + // If we versioned any parent groups' Controller Services, set their versioned component id's too. + final ProcessGroup parent = processGroup.getParent(); + if (parent != null) { + for (final ControllerServiceNode service : parent.getControllerServices(true)) { + if (!service.getVersionedComponentId().isPresent()) { + final String versionedId = versionedComponentIds.get(service.getIdentifier()); + if (versionedId != null) { + service.setVersionedComponentId(versionedId); + } + } + } + } } + private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function<String, String> lookup) { processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier())); @@ -2980,6 +3149,14 @@ public final class StandardProcessGroup implements ProcessGroup { .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup)); } + private void setSyncFailedState(final String explanation) { + boolean updated = false; + while (!updated) { + final StandardVersionedFlowStatus status = flowStatus.get(); + final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, explanation, status.getCurrentDifferences()); + updated = flowStatus.compareAndSet(status, updatedStatus); + } + } @Override public void synchronizeWithFlowRegistry(final FlowRegistryClient flowRegistryClient) { @@ -2991,6 +3168,10 @@ public final class StandardProcessGroup implements ProcessGroup { final String registryId = vci.getRegistryIdentifier(); final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); if (flowRegistry == null) { + final String message = String.format("Unable to synchronize Process Group with Flow Registry because Process Group was placed under Version Control using Flow Registry " + + "with identifier %s but cannot find any Flow Registry with this identifier", registryId); + setSyncFailedState(message); + LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry " + "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId); return; @@ -3005,8 +3186,12 @@ public final class StandardProcessGroup implements ProcessGroup { final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents(); vci.setFlowSnapshot(registryFlow); } catch (final IOException | NiFiRegistryException e) { + final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not retrieve version %s of flow with identifier %s in bucket %s", + vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier()); + setSyncFailedState(message); + LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}", - new Object[] {this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier()}, e); + this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e); return; } } @@ -3027,7 +3212,17 @@ public final class StandardProcessGroup implements ProcessGroup { LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}", new Object[] {this, vci.getVersion(), latestVersion}); } + + boolean updated = false; + while (!updated) { + final StandardVersionedFlowStatus status = flowStatus.get(); + final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(null, null, status.getCurrentDifferences()); + updated = flowStatus.compareAndSet(status, updatedStatus); + } } catch (final IOException | NiFiRegistryException e) { + final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry"); + setSyncFailedState(message); + LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e); } } @@ -3041,12 +3236,12 @@ public final class StandardProcessGroup implements ProcessGroup { verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true); + final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), true); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents()); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, new StaticDifferenceDescriptor()); + final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); final Set<String> updatedVersionedComponentIds = new HashSet<>(); @@ -3055,6 +3250,25 @@ public final class StandardProcessGroup implements ProcessGroup { continue; } + // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level + // and if so compare our VersionedControllerService to the existing service. + if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) { + final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA(); + if (ComponentType.CONTROLLER_SERVICE == component.getComponentType()) { + final ControllerServiceNode serviceNode = getVersionedControllerService(this, component.getIdentifier()); + if (serviceNode != null) { + final VersionedControllerService versionedService = mapper.mapControllerService(serviceNode, controllerServiceProvider); + final Set<FlowDifference> differences = flowComparator.compareControllerServices(versionedService, (VersionedControllerService) component); + + if (!differences.isEmpty()) { + updatedVersionedComponentIds.add(component.getIdentifier()); + } + + continue; + } + } + } + final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA(); updatedVersionedComponentIds.add(component.getIdentifier()); @@ -3081,6 +3295,35 @@ public final class StandardProcessGroup implements ProcessGroup { } } + private Set<String> getAncestorGroupServiceIds() { + final Set<String> ancestorServiceIds; + ProcessGroup parentGroup = getParent(); + + if (parentGroup == null) { + ancestorServiceIds = Collections.emptySet(); + } else { + ancestorServiceIds = parentGroup.getControllerServices(true).stream() + .map(ControllerServiceNode::getIdentifier) + .collect(Collectors.toSet()); + } + + return ancestorServiceIds; + } + + private ControllerServiceNode getVersionedControllerService(final ProcessGroup group, final String versionedComponentId) { + if (group == null) { + return null; + } + + for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) { + if (serviceNode.getVersionedComponentId().isPresent() && serviceNode.getVersionedComponentId().get().equals(versionedComponentId)) { + return serviceNode; + } + } + + return getVersionedControllerService(group.getParent(), versionedComponentId); + } + private Set<String> getKnownVariableNames() { final Set<String> variableNames = new HashSet<>(); populateKnownVariableNames(this, variableNames); @@ -3159,6 +3402,44 @@ public final class StandardProcessGroup implements ProcessGroup { group.setVersionControlInformation(vci, Collections.emptyMap()); } + + // Controller Services + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each + // Controller Service. This way, we ensure that all services have been created before setting the properties. This allows us to + // properly obtain the correct mapping of Controller Service VersionedComponentID to Controller Service instance id. + final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + + final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet()); + + final Map<ControllerServiceNode, VersionedControllerService> services = new HashMap<>(); + + // Add any Controller Service that does not yet exist. + for (final VersionedControllerService proposedService : proposed.getControllerServices()) { + ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier()); + if (service == null) { + service = addControllerService(group, proposedService, componentIdSeed); + LOG.info("Added {} to {}", service, this); + } + + services.put(service, proposedService); + } + + // Update all of the Controller Services to match the VersionedControllerService + for (final Map.Entry<ControllerServiceNode, VersionedControllerService> entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { + updateControllerService(service, proposedService); + LOG.info("Updated {}", service); + } + + controllerServicesRemoved.remove(proposedService.getIdentifier()); + } + // Child groups final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream() .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); @@ -3179,26 +3460,6 @@ public final class StandardProcessGroup implements ProcessGroup { childGroupsRemoved.remove(proposedChildGroup.getIdentifier()); } - - // Controller Services - final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream() - .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); - final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet()); - - for (final VersionedControllerService proposedService : proposed.getControllerServices()) { - final ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier()); - if (service == null) { - final ControllerServiceNode added = addControllerService(group, proposedService, componentIdSeed); - LOG.info("Added {} to {}", added, this); - } else if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { - updateControllerService(service, proposedService); - LOG.info("Updated {}", service); - } - - controllerServicesRemoved.remove(proposedService.getIdentifier()); - } - - // Funnels final Map<String, Funnel> funnelsByVersionedId = group.getFunnels().stream() .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); @@ -3608,7 +3869,7 @@ public final class StandardProcessGroup implements ProcessGroup { service.setAnnotationData(proposed.getAnnotationData()); service.setComments(proposed.getComments()); service.setName(proposed.getName()); - service.setProperties(populatePropertiesMap(service.getProperties(), proposed.getProperties())); + service.setProperties(populatePropertiesMap(service.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup())); if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) { final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); @@ -3728,7 +3989,7 @@ public final class StandardProcessGroup implements ProcessGroup { processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode())); processor.setName(proposed.getName()); processor.setPenalizationPeriod(proposed.getPenaltyDuration()); - processor.setProperties(populatePropertiesMap(processor.getProperties(), proposed.getProperties())); + processor.setProperties(populatePropertiesMap(processor.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup())); processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS); processor.setScheduldingPeriod(proposed.getSchedulingPeriod()); processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy())); @@ -3745,19 +4006,60 @@ public final class StandardProcessGroup implements ProcessGroup { } - private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties) { + private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties, + final Map<String, VersionedPropertyDescriptor> proposedDescriptors, final ProcessGroup group) { + final Map<String, String> fullPropertyMap = new HashMap<>(); for (final PropertyDescriptor property : currentProperties.keySet()) { fullPropertyMap.put(property.getName(), null); } if (proposedProperties != null) { - fullPropertyMap.putAll(proposedProperties); + for (final Map.Entry<String, String> entry : proposedProperties.entrySet()) { + final String propertyName = entry.getKey(); + final VersionedPropertyDescriptor descriptor = proposedDescriptors.get(propertyName); + + String value; + if (descriptor != null && descriptor.getIdentifiesControllerService()) { + // Property identifies a Controller Service. So the value that we want to assign is not the value given. + // The value given is instead the Versioned Component ID of the Controller Service. We want to resolve this + // to the instance ID of the Controller Service. + final String serviceVersionedComponentId = entry.getValue(); + final String instanceId = getServiceInstanceId(serviceVersionedComponentId, group); + value = instanceId == null ? serviceVersionedComponentId : instanceId; + } else { + value = entry.getValue(); + } + + fullPropertyMap.put(propertyName, value); + } } return fullPropertyMap; } + private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) { + for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) { + final Optional<String> optionalVersionedId = serviceNode.getVersionedComponentId(); + if (!optionalVersionedId.isPresent()) { + continue; + } + + final String versionedId = optionalVersionedId.get(); + if (versionedId.equals(serviceVersionedComponentId)) { + return serviceNode.getIdentifier(); + } + } + + final ProcessGroup parent = group.getParent(); + if (parent == null) { + return null; + } + + return getServiceInstanceId(serviceVersionedComponentId, parent); + + } + private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) { final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris()); rpg.setVersionedComponentId(proposed.getIdentifier()); @@ -3773,12 +4075,12 @@ public final class StandardProcessGroup implements ProcessGroup { rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout()); rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream() .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier())) - .collect(Collectors.toSet())); + .collect(Collectors.toSet()), false); rpg.setName(proposed.getName()); rpg.setNetworkInterface(proposed.getLocalNetworkInterface()); rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream() .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier())) - .collect(Collectors.toSet())); + .collect(Collectors.toSet()), false); rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); rpg.setProxyHost(proposed.getProxyHost()); rpg.setProxyPort(proposed.getProxyPort()); @@ -3831,12 +4133,12 @@ public final class StandardProcessGroup implements ProcessGroup { } final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false); + final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false); final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot()); - final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor()); + final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), new EvolvingDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); final Set<FlowDifference> differences = comparison.getDifferences(); final Set<FlowDifference> functionalDifferences = differences.stream() http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java new file mode 100644 index 0000000..f362c1e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import java.util.Set; + +import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedFlowStatus; +import org.apache.nifi.registry.flow.diff.FlowDifference; + +class StandardVersionedFlowStatus implements VersionedFlowStatus { + private final VersionedFlowState state; + private final String explanation; + private final Set<FlowDifference> currentDifferences; + + StandardVersionedFlowStatus(final VersionedFlowState state, final String explanation, final Set<FlowDifference> differences) { + this.state = state; + this.explanation = explanation; + this.currentDifferences = differences; + } + + @Override + public VersionedFlowState getState() { + return state; + } + + @Override + public String getStateExplanation() { + return explanation; + } + + Set<FlowDifference> getCurrentDifferences() { + return currentDifferences; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java index 92a4166..106d19a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java @@ -34,6 +34,7 @@ public class StandardVersionControlInformation implements VersionControlInformat private volatile VersionedProcessGroup flowSnapshot; private volatile boolean modified; private volatile boolean current; + private final VersionedFlowStatus status; public static class Builder { private String registryIdentifier; @@ -47,6 +48,7 @@ public class StandardVersionControlInformation implements VersionControlInformat private VersionedProcessGroup flowSnapshot; private Boolean modified = null; private Boolean current = null; + private VersionedFlowStatus status; public Builder registryId(String registryId) { this.registryIdentifier = registryId; @@ -103,6 +105,11 @@ public class StandardVersionControlInformation implements VersionControlInformat return this; } + public Builder status(final VersionedFlowStatus status) { + this.status = status; + return this; + } + public static Builder fromDto(VersionControlInformationDTO dto) { Builder builder = new Builder(); builder.registryId(dto.getRegistryId()) @@ -126,7 +133,7 @@ public class StandardVersionControlInformation implements VersionControlInformat Objects.requireNonNull(version, "Version must be specified"); final StandardVersionControlInformation svci = new StandardVersionControlInformation(registryIdentifier, registryName, - bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current); + bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current, status); svci.setBucketName(bucketName); svci.setFlowName(flowName); @@ -138,7 +145,7 @@ public class StandardVersionControlInformation implements VersionControlInformat public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version, - final VersionedProcessGroup snapshot, final boolean modified, final boolean current) { + final VersionedProcessGroup snapshot, final boolean modified, final boolean current, final VersionedFlowStatus status) { this.registryIdentifier = registryId; this.registryName = registryName; this.bucketIdentifier = bucketId; @@ -147,6 +154,7 @@ public class StandardVersionControlInformation implements VersionControlInformat this.flowSnapshot = snapshot; this.modified = modified; this.current = current; + this.status = status; } @@ -232,4 +240,9 @@ public class StandardVersionControlInformation implements VersionControlInformat public void setFlowSnapshot(final VersionedProcessGroup flowSnapshot) { this.flowSnapshot = flowSnapshot; } + + @Override + public VersionedFlowStatus getStatus() { + return status; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdef5b56/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java deleted file mode 100644 index 193bde8..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.registry.flow.mapping; - -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; - -import org.apache.nifi.registry.flow.BatchSize; -import org.apache.nifi.registry.flow.Bundle; -import org.apache.nifi.registry.flow.ComponentType; -import org.apache.nifi.registry.flow.ConnectableComponent; -import org.apache.nifi.registry.flow.ConnectableComponentType; -import org.apache.nifi.registry.flow.ControllerServiceAPI; -import org.apache.nifi.registry.flow.PortType; -import org.apache.nifi.registry.flow.Position; -import org.apache.nifi.registry.flow.VersionedConnection; -import org.apache.nifi.registry.flow.VersionedControllerService; -import org.apache.nifi.registry.flow.VersionedFunnel; -import org.apache.nifi.registry.flow.VersionedLabel; -import org.apache.nifi.registry.flow.VersionedPort; -import org.apache.nifi.registry.flow.VersionedProcessGroup; -import org.apache.nifi.registry.flow.VersionedProcessor; -import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; -import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; -import org.apache.nifi.web.api.dto.BatchSettingsDTO; -import org.apache.nifi.web.api.dto.BundleDTO; -import org.apache.nifi.web.api.dto.ConnectableDTO; -import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.ControllerServiceApiDTO; -import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.FunnelDTO; -import org.apache.nifi.web.api.dto.LabelDTO; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.dto.PositionDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorConfigDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; - - -public class NiFiRegistryDtoMapper { - // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when - // we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned' - // identifier based on the comopnent's actual id. We do connections last, so that all components will already have been - // created before attempting to create the connection, where the ConnectableDTO is converted. - private Map<String, String> versionedComponentIds = new HashMap<>(); - - public VersionedProcessGroup mapProcessGroup(final ProcessGroupDTO dto) { - versionedComponentIds.clear(); - return mapGroup(dto); - } - - private VersionedProcessGroup mapGroup(final ProcessGroupDTO dto) { - final VersionedProcessGroup versionedGroup = new VersionedProcessGroup(); - versionedGroup.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - versionedGroup.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - versionedGroup.setName(dto.getName()); - versionedGroup.setComments(dto.getComments()); - versionedGroup.setPosition(mapPosition(dto.getPosition())); - - final FlowSnippetDTO contents = dto.getContents(); - - versionedGroup.setControllerServices(contents.getControllerServices().stream() - .map(this::mapControllerService) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setFunnels(contents.getFunnels().stream() - .map(this::mapFunnel) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setInputPorts(contents.getInputPorts().stream() - .map(this::mapPort) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setOutputPorts(contents.getOutputPorts().stream() - .map(this::mapPort) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setLabels(contents.getLabels().stream() - .map(this::mapLabel) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setProcessors(contents.getProcessors().stream() - .map(this::mapProcessor) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setRemoteProcessGroups(contents.getRemoteProcessGroups().stream() - .map(this::mapRemoteProcessGroup) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setProcessGroups(contents.getProcessGroups().stream() - .map(this::mapGroup) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setConnections(contents.getConnections().stream() - .map(this::mapConnection) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - return versionedGroup; - } - - private String getId(final String currentVersionedId, final String componentId) { - final String versionedId; - if (currentVersionedId == null) { - versionedId = UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString(); - } else { - versionedId = currentVersionedId; - } - - versionedComponentIds.put(componentId, versionedId); - return versionedId; - } - - private String getGroupId(final String groupId) { - return versionedComponentIds.get(groupId); - } - - public VersionedConnection mapConnection(final ConnectionDTO dto) { - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - connection.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - connection.setName(dto.getName()); - connection.setBackPressureDataSizeThreshold(dto.getBackPressureDataSizeThreshold()); - connection.setBackPressureObjectThreshold(dto.getBackPressureObjectThreshold()); - connection.setFlowFileExpiration(dto.getFlowFileExpiration()); - connection.setLabelIndex(dto.getLabelIndex()); - connection.setPosition(mapPosition(dto.getPosition())); - connection.setPrioritizers(dto.getPrioritizers()); - connection.setSelectedRelationships(dto.getSelectedRelationships()); - connection.setzIndex(dto.getzIndex()); - - connection.setBends(dto.getBends().stream() - .map(this::mapPosition) - .collect(Collectors.toList())); - - connection.setSource(mapConnectable(dto.getSource())); - connection.setDestination(mapConnectable(dto.getDestination())); - - return connection; - } - - public ConnectableComponent mapConnectable(final ConnectableDTO dto) { - final ConnectableComponent component = new ConnectableComponent(); - - final String versionedId = dto.getVersionedComponentId(); - if (versionedId == null) { - final String resolved = versionedComponentIds.get(dto.getId()); - if (resolved == null) { - throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + dto.getId() + " to any version-controlled component"); - } - - component.setId(resolved); - } else { - component.setId(versionedId); - } - - component.setComments(dto.getComments()); - component.setGroupId(dto.getGroupId()); - component.setName(dto.getName()); - component.setType(ConnectableComponentType.valueOf(dto.getType())); - return component; - } - - public VersionedControllerService mapControllerService(final ControllerServiceDTO dto) { - final VersionedControllerService service = new VersionedControllerService(); - service.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - service.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - service.setName(dto.getName()); - service.setAnnotationData(dto.getAnnotationData()); - service.setBundle(mapBundle(dto.getBundle())); - service.setComments(dto.getComments()); - service.setControllerServiceApis(dto.getControllerServiceApis().stream() - .map(this::mapControllerServiceApi) - .collect(Collectors.toList())); - service.setProperties(dto.getProperties()); - service.setType(dto.getType()); - return null; - } - - private Bundle mapBundle(final BundleDTO dto) { - final Bundle bundle = new Bundle(); - bundle.setGroup(dto.getGroup()); - bundle.setArtifact(dto.getArtifact()); - bundle.setVersion(dto.getVersion()); - return bundle; - } - - private ControllerServiceAPI mapControllerServiceApi(final ControllerServiceApiDTO dto) { - final ControllerServiceAPI api = new ControllerServiceAPI(); - api.setBundle(mapBundle(dto.getBundle())); - api.setType(dto.getType()); - return api; - } - - public VersionedFunnel mapFunnel(final FunnelDTO dto) { - final VersionedFunnel funnel = new VersionedFunnel(); - funnel.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - funnel.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - funnel.setPosition(mapPosition(dto.getPosition())); - return funnel; - } - - public VersionedLabel mapLabel(final LabelDTO dto) { - final VersionedLabel label = new VersionedLabel(); - label.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - label.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - label.setHeight(dto.getHeight()); - label.setWidth(dto.getWidth()); - label.setLabel(dto.getLabel()); - label.setPosition(mapPosition(dto.getPosition())); - label.setStyle(dto.getStyle()); - return label; - } - - public VersionedPort mapPort(final PortDTO dto) { - final VersionedPort port = new VersionedPort(); - port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - port.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - port.setComments(dto.getComments()); - port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount()); - port.setName(dto.getName()); - port.setPosition(mapPosition(dto.getPosition())); - port.setType(PortType.valueOf(dto.getType())); - return port; - } - - public Position mapPosition(final PositionDTO dto) { - final Position position = new Position(); - position.setX(dto.getX()); - position.setY(dto.getY()); - return position; - } - - public VersionedProcessor mapProcessor(final ProcessorDTO dto) { - final ProcessorConfigDTO config = dto.getConfig(); - - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - processor.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - processor.setType(dto.getType()); - processor.setAnnotationData(config.getAnnotationData()); - processor.setAutoTerminatedRelationships(config.getAutoTerminatedRelationships()); - processor.setBulletinLevel(config.getBulletinLevel()); - processor.setBundle(mapBundle(dto.getBundle())); - processor.setComments(config.getComments()); - processor.setConcurrentlySchedulableTaskCount(config.getConcurrentlySchedulableTaskCount()); - processor.setExecutionNode(config.getExecutionNode()); - processor.setName(dto.getName()); - processor.setPenaltyDuration(config.getPenaltyDuration()); - processor.setPosition(mapPosition(dto.getPosition())); - processor.setProperties(config.getProperties()); - processor.setRunDurationMillis(config.getRunDurationMillis()); - processor.setSchedulingPeriod(config.getSchedulingPeriod()); - processor.setSchedulingStrategy(config.getSchedulingStrategy()); - processor.setStyle(dto.getStyle()); - processor.setYieldDuration(config.getYieldDuration()); - return processor; - } - - public VersionedRemoteProcessGroup mapRemoteProcessGroup(final RemoteProcessGroupDTO dto) { - final VersionedRemoteProcessGroup rpg = new VersionedRemoteProcessGroup(); - rpg.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - rpg.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - rpg.setComments(dto.getComments()); - rpg.setCommunicationsTimeout(dto.getCommunicationsTimeout()); - rpg.setLocalNetworkInterface(dto.getLocalNetworkInterface()); - rpg.setName(dto.getName()); - rpg.setInputPorts(dto.getContents().getInputPorts().stream() - .map(port -> mapRemotePort(port, ComponentType.REMOTE_INPUT_PORT)) - .collect(Collectors.toSet())); - rpg.setOutputPorts(dto.getContents().getOutputPorts().stream() - .map(port -> mapRemotePort(port, ComponentType.REMOTE_OUTPUT_PORT)) - .collect(Collectors.toSet())); - rpg.setPosition(mapPosition(dto.getPosition())); - rpg.setProxyHost(dto.getProxyHost()); - rpg.setProxyPort(dto.getProxyPort()); - rpg.setProxyUser(dto.getProxyUser()); - rpg.setTargetUri(dto.getTargetUri()); - rpg.setTargetUris(dto.getTargetUris()); - rpg.setTransportProtocol(dto.getTransportProtocol()); - rpg.setYieldDuration(dto.getYieldDuration()); - return rpg; - } - - public VersionedRemoteGroupPort mapRemotePort(final RemoteProcessGroupPortDTO dto, final ComponentType componentType) { - final VersionedRemoteGroupPort port = new VersionedRemoteGroupPort(); - port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - port.setGroupIdentifier(getGroupId(dto.getGroupId())); - port.setComments(dto.getComments()); - port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount()); - port.setRemoteGroupId(dto.getGroupId()); - port.setName(dto.getName()); - port.setUseCompression(dto.getUseCompression()); - port.setBatchSize(mapBatchSettings(dto.getBatchSettings())); - port.setTargetId(dto.getTargetId()); - port.setComponentType(componentType); - return port; - } - - private BatchSize mapBatchSettings(final BatchSettingsDTO dto) { - final BatchSize batchSize = new BatchSize(); - batchSize.setCount(dto.getCount()); - batchSize.setDuration(dto.getDuration()); - batchSize.setSize(dto.getSize()); - return batchSize; - } -}
