http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java index 94e0745..a91ce97 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java @@ -18,6 +18,7 @@ package org.apache.nifi.connectable; import org.apache.nifi.authorization.resource.ComponentAuthorizable; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.controller.Triggerable; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessSession; @@ -32,11 +33,12 @@ import java.util.concurrent.TimeUnit; /** * Represents a connectable component to which or from which data can flow. */ -public interface Connectable extends Triggerable, ComponentAuthorizable, Positionable { +public interface Connectable extends Triggerable, ComponentAuthorizable, Positionable, VersionedComponent { /** * @return the unique identifier for this <code>Connectable</code> */ + @Override String getIdentifier(); /**
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java index acdcec6..423f52d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java @@ -17,6 +17,7 @@ package org.apache.nifi.connectable; import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.groups.ProcessGroup; @@ -27,7 +28,7 @@ import java.util.Collection; import java.util.List; import java.util.Set; -public interface Connection extends Authorizable { +public interface Connection extends Authorizable, VersionedComponent { void enqueue(FlowFileRecord flowFile); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java index 7190fd4..0240648 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java @@ -42,6 +42,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -81,6 +82,7 @@ public abstract class AbstractPort implements Port { private final AtomicReference<String> penalizationPeriod; private final AtomicReference<String> yieldPeriod; private final AtomicReference<String> schedulingPeriod; + private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); private final AtomicLong schedulingNanos; private final AtomicLong yieldExpiration; private final ProcessScheduler processScheduler; @@ -635,4 +637,27 @@ public abstract class AbstractPort implements Port { @Override public void verifyCanClearState() { } + + @Override + public Optional<String> getVersionedComponentId() { + return Optional.ofNullable(versionedComponentId.get()); + } + + @Override + public void setVersionedComponentId(final String versionedComponentId) { + boolean updated = false; + while (!updated) { + final String currentId = this.versionedComponentId.get(); + + if (currentId == null) { + updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); + } else if (currentId.equals(versionedComponentId)) { + return; + } else if (versionedComponentId == null) { + updated = this.versionedComponentId.compareAndSet(currentId, null); + } else { + throw new IllegalStateException(this + " is already under version control"); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java index 34ffbac..4b3507c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@ -43,6 +43,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -77,6 +78,7 @@ public class StandardFunnel implements Funnel { private final AtomicBoolean lossTolerant; private final AtomicReference<ScheduledState> scheduledState; private final AtomicLong yieldExpiration; + private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -557,4 +559,27 @@ public class StandardFunnel implements Funnel { public String getComponentType() { return "Funnel"; } + + @Override + public Optional<String> getVersionedComponentId() { + return Optional.ofNullable(versionedComponentId.get()); + } + + @Override + public void setVersionedComponentId(final String versionedComponentId) { + boolean updated = false; + while (!updated) { + final String currentId = this.versionedComponentId.get(); + + if (currentId == null) { + updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); + } else if (currentId.equals(versionedComponentId)) { + return; + } else if (versionedComponentId == null) { + updated = this.versionedComponentId.compareAndSet(currentId, null); + } else { + throw new IllegalStateException(this + " is already under version control"); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java index bc1be00..d463725 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java @@ -17,14 +17,16 @@ package org.apache.nifi.controller.label; import org.apache.nifi.authorization.resource.ComponentAuthorizable; +import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.connectable.Positionable; import org.apache.nifi.connectable.Size; import org.apache.nifi.groups.ProcessGroup; import java.util.Map; -public interface Label extends ComponentAuthorizable, Positionable { +public interface Label extends ComponentAuthorizable, Positionable, VersionedComponent { + @Override String getIdentifier(); Map<String, String> getStyle(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index 3dd1076..2f28963 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.service; +import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.LoggableComponent; @@ -26,7 +27,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -public interface ControllerServiceNode extends ConfiguredComponent { +public interface ControllerServiceNode extends ConfiguredComponent, VersionedComponent { /** * @return the Process Group that this Controller Service belongs to, or <code>null</code> if the Controller Service http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 0baba23..8934788 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.groups; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; @@ -24,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; import org.apache.nifi.authorization.resource.ComponentAuthorizable; +import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; @@ -39,6 +41,10 @@ import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.Processor; import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.UnknownResourceException; +import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.remote.RemoteGroupPort; /** @@ -50,7 +56,7 @@ import org.apache.nifi.remote.RemoteGroupPort; * <p> * MUST BE THREAD-SAFE</p> */ -public interface ProcessGroup extends ComponentAuthorizable, Positionable { +public interface ProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent { /** * Predicate for filtering schedulable Processors. @@ -772,6 +778,17 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable { void move(final Snippet snippet, final ProcessGroup destination); /** + * Updates the Process Group to match the proposed flow + * + * @param proposedSnapshot the proposed flow + * @param componentIdSeed a seed value to use when generating ID's for new components + * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If this value is <code>true</code>, + * and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will + * throw an IllegalStateException + */ + void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty); + + /** * Verifies a template with the specified name can be created. * * @param name name of the template @@ -832,6 +849,18 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable { void verifyCanUpdateVariables(Map<String, String> updatedVariables); /** + * Ensure that the contents of the Process Group can be update to match the given new flow + * + * @param updatedFlow the updated version of the flow + * @param verifyConnectionRemoval whether or not to verify that connections that are not present in the updated flow can be removed + * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If <code>true</code> and the Process Group has been changed since + * it was last synchronized with the FlowRegistry, then this method will throw an IllegalStateException + * + * @throws IllegalStateException if the Process Group is not in a state that will allow the update + */ + void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty); + + /** * Adds the given template to this Process Group * * @param template the template to add @@ -894,4 +923,30 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable { * @return a set of all components that are affected by the variable with the given name */ Set<ConfiguredComponent> getComponentsAffectedByVariable(String variableName); + + /** + * @return the version control information that indicates where this flow is stored in a Flow Registry, + * or <code>null</code> if this Process Group is not under version control. + */ + VersionControlInformation getVersionControlInformation(); + + /** + * Updates the Version Control Information for this Process Group + * + * @param versionControlInformation specification of where the flow is tracked in Version Control + * @param versionedComponentIds a mapping of component ID's to Versioned Component ID's. This is used to update the components in the + * Process Group so that the components that exist in the Process Group can be associated with the corresponding components in the + * Version Controlled flow + */ + void setVersionControlInformation(VersionControlInformation versionControlInformation, Map<String, String> versionedComponentIds); + + /** + * Synchronizes the Process Group with the given Flow Registry, determining whether or not the local flow + * is up to date with the newest version of the flow in the Registry and whether or not the local flow has been + * modified since it was last synced with the Flow Registry. If this Process Group is not under Version Control, + * this method will have no effect. + * + * @param flowRegistry the Flow Registry to synchronize with + */ + void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 79b9509..e4da31b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -18,6 +18,7 @@ package org.apache.nifi.groups; import org.apache.nifi.authorization.resource.ComponentAuthorizable; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.connectable.Positionable; import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.events.EventReporter; @@ -30,7 +31,7 @@ import java.util.Date; import java.util.Set; import java.util.concurrent.TimeUnit; -public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable { +public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent { @Override String getIdentifier(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java index b797749..2f9a9fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java @@ -40,6 +40,11 @@ public interface RemoteProcessGroupPortDescriptor { String getTargetId(); /** + * @return the ID corresponding to the component that is under version control + */ + String getVersionedComponentId(); + + /** * @return id of the remote process group that this port resides in */ String getGroupId(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java new file mode 100644 index 0000000..a5bb738 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +import java.io.IOException; + +public interface FlowRegistry { + + /** + * @return the URL of the Flow Registry + */ + String getURL(); + + /** + * Registers the given Versioned Flow with the Flow Registry + * + * @param flow the Versioned Flow to add to the registry + * @return the fully populated VersionedFlow + * + * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier or name is null + * @throws UnknownResourceException if the bucket id does not exist + */ + VersionedFlow registerVersionedFlow(VersionedFlow flow) throws IOException, UnknownResourceException; + + /** + * Adds the given snapshot to the Flow Registry for the given flow + * + * @param flow the Versioned Flow + * @param snapshot the snapshot of the flow + * @param comments any comments for the snapshot + * @return the versioned flow snapshot + * + * @throws IOException if unable to communicate with the registry + * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier is null, or if the flow to snapshot is null + * @throws UnknownResourceException if the flow does not exist + */ + VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments) throws IOException, UnknownResourceException; + + /** + * Returns the latest (most recent) version of the Flow in the Flow Registry for the given bucket and flow + * + * @param bucketId the ID of the bucket + * @param flowId the ID of the flow + * @return the latest version of the Flow + * + * @throws IOException if unable to communicate with the Flow Registry + * @throws UnknownResourceException if unable to find the bucket with the given ID or the flow with the given ID + */ + int getLatestVersion(String bucketId, String flowId) throws IOException, UnknownResourceException; + + /** + * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry + * + * @param bucketId the ID of the bucket + * @param flowId the ID of the flow + * @param version the version to retrieve + * @return the contents of the Flow from the Flow Registry + * + * @throws IOException if unable to communicate with the Flow Registry + * @throws UnknownResourceException if unable to find the contents of the flow due to the bucket or flow not existing, + * or the specified version of the flow not existing + * @throws NullPointerException if any of the arguments is not specified + * @throws IllegalArgumentException if the given version is less than 1 + */ + VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, UnknownResourceException; + + /** + * Retrieves a VersionedFlow by bucket id and flow id + * + * @param bucketId the ID of the bucket + * @param flowId the ID of the flow + * @return the VersionedFlow for the given bucket and flow ID's + * + * @throws IOException if unable to communicate with the Flow Registry + * @throws UnknownResourceException if unable to find a flow with the given bucket ID and flow ID + */ + VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, UnknownResourceException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java new file mode 100644 index 0000000..83f66dc --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +import java.util.Set; + +public interface FlowRegistryClient { + FlowRegistry getFlowRegistry(String registryId); + + default String getFlowRegistryId(String registryUrl) { + for (final String registryClientId : getRegistryIdentifiers()) { + final FlowRegistry registry = getFlowRegistry(registryClientId); + if (registry.getURL().equals(registryUrl)) { + return registryClientId; + } + } + + return null; + } + + Set<String> getRegistryIdentifiers(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java new file mode 100644 index 0000000..8c95e67 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +public class UnknownResourceException extends Exception { + + public UnknownResourceException(String message) { + super(message); + } + + public UnknownResourceException(String message, Throwable cause) { + super(message, cause); + } + + public UnknownResourceException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java new file mode 100644 index 0000000..ea70b1c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +import java.util.Optional; + +/** + * <p> + * Provides a mechanism for conveying which Flow Registry a flow is stored in, and + * where in the Flow Registry the flow is stored. + * </p> + */ +public interface VersionControlInformation { + + /** + * @return the unique identifier of the Flow Registry that this flow is tracking to + */ + String getRegistryIdentifier(); + + /** + * @return the unique identifier of the bucket that this flow belongs to + */ + String getBucketIdentifier(); + + /** + * @return the unique identifier of this flow in the Flow Registry + */ + String getFlowIdentifier(); + + /** + * @return the version of the flow in the Flow Registry that this flow is based on. + */ + int getVersion(); + + /** + * @return <code>true</code> if the flow has been modified since the last time that it was updated from the Flow Registry or saved + * to the Flow Registry; <code>false</code> if the flow is in sync with the Flow Registry. An empty optional will be returned + * if it is not yet known whether or not the flow has been modified (for example, on startup, when the flow has not yet been + * fetched from the Flow Registry) + */ + Optional<Boolean> getModified(); + + /** + * @return <code>true</code> if this version of the flow is the most recent version of the flow available in the Flow Registry. + * An empty optional will be returned if it is not yet known whether or not the flow has been modified (for example, on startup, + * when the flow has not yet been fetched from the Flow Registry) + */ + Optional<Boolean> getCurrent(); + + /** + * @return the snapshot of the flow that was synchronized with the Flow Registry + */ + VersionedProcessGroup getFlowSnapshot(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index d53eb49..09d032e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -152,6 +152,14 @@ <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-data-model</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-flow-diff</artifactId> + </dependency> <dependency> <groupId>org.apache.curator</groupId> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 728e8cf..7aa3003 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -47,6 +47,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -71,6 +72,7 @@ public final class StandardConnection implements Connection { private final StandardFlowFileQueue flowFileQueue; private final AtomicInteger labelIndex = new AtomicInteger(1); private final AtomicLong zIndex = new AtomicLong(0L); + private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); private final ProcessScheduler scheduler; private final int hashCode; @@ -519,4 +521,27 @@ public final class StandardConnection implements Connection { } } } + + @Override + public Optional<String> getVersionedComponentId() { + return Optional.ofNullable(versionedComponentId.get()); + } + + @Override + public void setVersionedComponentId(final String versionedComponentId) { + boolean updated = false; + while (!updated) { + final String currentId = this.versionedComponentId.get(); + + if (currentId == null) { + updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); + } else if (currentId.equals(versionedComponentId)) { + return; + } else if (versionedComponentId == null) { + updated = this.versionedComponentId.compareAndSet(currentId, null); + } else { + throw new IllegalStateException(this + " is already under version control"); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 56b2590..242ef6a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -164,6 +164,7 @@ import org.apache.nifi.provenance.ProvenanceRepository; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.remote.HttpRemoteSiteListener; @@ -329,6 +330,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization; private final LeaderElectionManager leaderElectionManager; private final ClusterCoordinator clusterCoordinator; + private final FlowRegistryClient flowRegistryClient; /** * true if controller is configured to operate in a clustered environment @@ -395,7 +397,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final AuditService auditService, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, - final VariableRegistry variableRegistry) { + final VariableRegistry variableRegistry, + final FlowRegistryClient flowRegistryClient) { return new FlowController( flowFileEventRepo, @@ -409,7 +412,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /* cluster coordinator */ null, /* heartbeat monitor */ null, /* leader election manager */ null, - /* variable registry */ variableRegistry); + /* variable registry */ variableRegistry, + flowRegistryClient); } public static FlowController createClusteredInstance( @@ -423,7 +427,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ClusterCoordinator clusterCoordinator, final HeartbeatMonitor heartbeatMonitor, final LeaderElectionManager leaderElectionManager, - final VariableRegistry variableRegistry) { + final VariableRegistry variableRegistry, + final FlowRegistryClient flowRegistryClient) { final FlowController flowController = new FlowController( flowFileEventRepo, @@ -437,7 +442,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R clusterCoordinator, heartbeatMonitor, leaderElectionManager, - variableRegistry); + variableRegistry, + flowRegistryClient); return flowController; } @@ -454,7 +460,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ClusterCoordinator clusterCoordinator, final HeartbeatMonitor heartbeatMonitor, final LeaderElectionManager leaderElectionManager, - final VariableRegistry variableRegistry) { + final VariableRegistry variableRegistry, + final FlowRegistryClient flowRegistryClient) { maxTimerDrivenThreads = new AtomicInteger(10); maxEventDrivenThreads = new AtomicInteger(5); @@ -516,6 +523,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R startRemoteGroupPortsAfterInitialization = new ArrayList<>(); this.authorizer = authorizer; this.auditService = auditService; + this.flowRegistryClient = flowRegistryClient; final String gracefulShutdownSecondsVal = nifiProperties.getProperty(GRACEFUL_SHUTDOWN_PERIOD); long shutdownSecs; @@ -754,6 +762,23 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } }, 0L, 30L, TimeUnit.SECONDS); + timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + final ProcessGroup rootGroup = getRootGroup(); + final List<ProcessGroup> allGroups = rootGroup.findAllProcessGroups(); + allGroups.add(rootGroup); + + for (final ProcessGroup group : allGroups) { + try { + group.synchronizeWithFlowRegistry(flowRegistryClient); + } catch (final Exception e) { + LOG.error("Failed to synchronize {} with Flow Registry", group, e); + } + } + } + }, 5, 60, TimeUnit.SECONDS); + initialized.set(true); } finally { writeLock.unlock(); @@ -3311,6 +3336,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return new HashSet<>(reportingTasks.values()); } + public FlowRegistryClient getFlowRegistryClient() { + return flowRegistryClient; + } + @Override public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded) { final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, bundleCoordinate, additionalUrls, firstTimeAdded); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 3a0b093..e879e38 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -85,6 +85,8 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.registry.flow.StandardVersionControlInformation; +import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; @@ -113,6 +115,7 @@ import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.dto.TemplateDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; @@ -1048,6 +1051,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion); final ProcessGroup processGroup = controller.createProcessGroup(processGroupDTO.getId()); processGroup.setComments(processGroupDTO.getComments()); + processGroup.setVersionedComponentId(processGroupDTO.getVersionedComponentId()); processGroup.setPosition(toPosition(processGroupDTO.getPosition())); processGroup.setName(processGroupDTO.getName()); processGroup.setParent(parentGroup); @@ -1072,6 +1076,20 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { processGroup.setVariables(variables); + final VersionControlInformationDTO versionControlInfoDto = processGroupDTO.getVersionControlInformation(); + if (versionControlInfoDto != null) { + final String registryId = versionControlInfoDto.getRegistryId(); + final String bucketId = versionControlInfoDto.getBucketId(); + final String flowId = versionControlInfoDto.getFlowId(); + final int version = versionControlInfoDto.getVersion(); + final boolean modified = false; + final boolean current = true; + + final VersionControlInformation versionControlInformation = new StandardVersionControlInformation(registryId, bucketId, flowId, version, null, modified, current); + // pass empty map for the version control mapping because the VersionedComponentId has already been set on the components + processGroup.setVersionControlInformation(versionControlInformation, Collections.emptyMap()); + } + // Add Controller Services final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService"); if (!serviceNodeList.isEmpty()) { @@ -1097,6 +1115,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } final ProcessorNode procNode = controller.createProcessor(processorDTO.getType(), processorDTO.getId(), coordinate, false); + procNode.setVersionedComponentId(processorDTO.getVersionedComponentId()); processGroup.addProcessor(procNode); updateProcessor(procNode, processorDTO, processGroup, controller); } @@ -1113,6 +1132,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { port = controller.createLocalInputPort(portDTO.getId(), portDTO.getName()); } + port.setVersionedComponentId(portDTO.getVersionedComponentId()); port.setPosition(toPosition(portDTO.getPosition())); port.setComments(portDTO.getComments()); port.setProcessGroup(processGroup); @@ -1156,6 +1176,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } else { port = controller.createLocalOutputPort(portDTO.getId(), portDTO.getName()); } + + port.setVersionedComponentId(portDTO.getVersionedComponentId()); port.setPosition(toPosition(portDTO.getPosition())); port.setComments(portDTO.getComments()); port.setProcessGroup(processGroup); @@ -1193,6 +1215,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { for (final Element funnelElement : funnelNodeList) { final FunnelDTO funnelDTO = FlowFromDOMFactory.getFunnel(funnelElement); final Funnel funnel = controller.createFunnel(funnelDTO.getId()); + funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId()); funnel.setPosition(toPosition(funnelDTO.getPosition())); // Since this is called during startup, we want to add the funnel without enabling it @@ -1207,6 +1230,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { for (final Element labelElement : labelNodeList) { final LabelDTO labelDTO = FlowFromDOMFactory.getLabel(labelElement); final Label label = controller.createLabel(labelDTO.getId(), labelDTO.getLabel()); + label.setVersionedComponentId(labelDTO.getVersionedComponentId()); label.setStyle(labelDTO.getStyle()); label.setPosition(toPosition(labelDTO.getPosition())); @@ -1225,6 +1249,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { for (final Element remoteProcessGroupElement : remoteProcessGroupNodeList) { final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement, encryptor); final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUris()); + remoteGroup.setVersionedComponentId(remoteGroupDto.getVersionedComponentId()); remoteGroup.setComments(remoteGroupDto.getComments()); remoteGroup.setPosition(toPosition(remoteGroupDto.getPosition())); final String name = remoteGroupDto.getName(); @@ -1332,6 +1357,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } final Connection connection = controller.createConnection(dto.getId(), dto.getName(), source, destination, dto.getSelectedRelationships()); + connection.setVersionedComponentId(dto.getVersionedComponentId()); connection.setProcessGroup(processGroup); final List<Position> bendPoints = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 88912aa..187b62f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -126,6 +127,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final AtomicInteger concurrentTaskCount; private final AtomicLong yieldExpiration; private final AtomicLong schedulingNanos; + private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); private final ProcessScheduler processScheduler; private long runNanos = 0L; private volatile long yieldNanos; @@ -1511,4 +1513,26 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return group == null ? null : group.getIdentifier(); } + @Override + public Optional<String> getVersionedComponentId() { + return Optional.ofNullable(versionedComponentId.get()); + } + + @Override + public void setVersionedComponentId(final String versionedComponentId) { + boolean updated = false; + while (!updated) { + final String currentId = this.versionedComponentId.get(); + + if (currentId == null) { + updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); + } else if (currentId.equals(versionedComponentId)) { + return; + } else if (versionedComponentId == null) { + updated = this.versionedComponentId.compareAndSet(currentId, null); + } else { + throw new IllegalStateException(this + " is already under version control"); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java index 32e742d..2e98e84 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java @@ -28,6 +28,7 @@ import org.apache.nifi.util.CharacterFilterUtils; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; public class StandardLabel implements Label { @@ -38,6 +39,7 @@ public class StandardLabel implements Label { private final AtomicReference<Map<String, String>> style; private final AtomicReference<String> value; private final AtomicReference<ProcessGroup> processGroup; + private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); public StandardLabel(final String identifier, final String value) { this(identifier, new Position(0D, 0D), new HashMap<String, String>(), value, null); @@ -76,6 +78,7 @@ public class StandardLabel implements Label { } } + @Override public String getIdentifier() { return identifier; } @@ -96,10 +99,12 @@ public class StandardLabel implements Label { return ResourceFactory.getComponentResource(ResourceType.Label, getIdentifier(),"Label"); } + @Override public Map<String, String> getStyle() { return style.get(); } + @Override public void setStyle(final Map<String, String> style) { if (style != null) { boolean updated = false; @@ -112,19 +117,46 @@ public class StandardLabel implements Label { } } + @Override public String getValue() { return value.get(); } + @Override public void setValue(final String value) { this.value.set(CharacterFilterUtils.filterInvalidXmlCharacters(value)); } + @Override public void setProcessGroup(final ProcessGroup group) { this.processGroup.set(group); } + @Override public ProcessGroup getProcessGroup() { return processGroup.get(); } + + @Override + public Optional<String> getVersionedComponentId() { + return Optional.ofNullable(versionedComponentId.get()); + } + + @Override + public void setVersionedComponentId(final String versionedComponentId) { + boolean updated = false; + while (!updated) { + final String currentId = this.versionedComponentId.get(); + + if (currentId == null) { + updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); + } else if (currentId.equals(versionedComponentId)) { + return; + } else if (versionedComponentId == null) { + updated = this.versionedComponentId.compareAndSet(currentId, null); + } else { + throw new IllegalStateException(this + " is already under version control"); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index f67ecd9..a2a589a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -50,6 +50,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.w3c.dom.Element; import org.w3c.dom.NodeList; @@ -102,6 +103,7 @@ public class FlowFromDOMFactory { final ControllerServiceDTO dto = new ControllerServiceDTO(); dto.setId(getString(element, "id")); + dto.setVersionedComponentId(getString(element, "versionedComponentId")); dto.setName(getString(element, "name")); dto.setComments(getString(element, "comment")); dto.setType(getString(element, "class")); @@ -138,6 +140,7 @@ public class FlowFromDOMFactory { final ProcessGroupDTO dto = new ProcessGroupDTO(); final String groupId = getString(element, "id"); dto.setId(groupId); + dto.setVersionedComponentId(getString(element, "versionedComponentId")); dto.setParentGroupId(parentId); dto.setName(getString(element, "name")); dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); @@ -153,6 +156,9 @@ public class FlowFromDOMFactory { } dto.setVariables(variables); + final Element versionControlInfoElement = DomUtils.getChild(element, "versionControlInformation"); + dto.setVersionControlInformation(getVersionControlInformation(versionControlInfoElement)); + final Set<ProcessorDTO> processors = new HashSet<>(); final Set<ConnectionDTO> connections = new HashSet<>(); final Set<FunnelDTO> funnels = new HashSet<>(); @@ -216,12 +222,26 @@ public class FlowFromDOMFactory { return dto; } + private static VersionControlInformationDTO getVersionControlInformation(final Element versionControlInfoElement) { + if (versionControlInfoElement == null) { + return null; + } + + final VersionControlInformationDTO dto = new VersionControlInformationDTO(); + dto.setRegistryId(getString(versionControlInfoElement, "registryId")); + dto.setBucketId(getString(versionControlInfoElement, "bucketId")); + dto.setFlowId(getString(versionControlInfoElement, "flowId")); + dto.setVersion(getInt(versionControlInfoElement, "version")); + return dto; + } + public static ConnectionDTO getConnection(final Element element) { final ConnectionDTO dto = new ConnectionDTO(); dto.setId(getString(element, "id")); dto.setName(getString(element, "name")); dto.setLabelIndex(getOptionalInt(element, "labelIndex")); dto.setzIndex(getOptionalLong(element, "zIndex")); + dto.setVersionedComponentId(getString(element, "versionedComponentId")); final List<PositionDTO> bends = new ArrayList<>(); final Element bendPointsElement = DomUtils.getChild(element, "bendPoints"); @@ -278,6 +298,7 @@ public class FlowFromDOMFactory { public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element, final StringEncryptor encryptor) { final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO(); dto.setId(getString(element, "id")); + dto.setVersionedComponentId(getString(element, "versionedComponentId")); dto.setName(getString(element, "name")); dto.setTargetUri(getString(element, "url")); dto.setTargetUris(getString(element, "urls")); @@ -302,6 +323,7 @@ public class FlowFromDOMFactory { public static LabelDTO getLabel(final Element element) { final LabelDTO dto = new LabelDTO(); dto.setId(getString(element, "id")); + dto.setVersionedComponentId(getString(element, "versionedComponentId")); dto.setLabel(getString(element, "value")); dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); final Size size = getSize(DomUtils.getChild(element, "size")); @@ -315,6 +337,7 @@ public class FlowFromDOMFactory { public static FunnelDTO getFunnel(final Element element) { final FunnelDTO dto = new FunnelDTO(); dto.setId(getString(element, "id")); + dto.setVersionedComponentId(getString(element, "versionedComponentId")); dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); return dto; @@ -323,6 +346,7 @@ public class FlowFromDOMFactory { public static PortDTO getPort(final Element element) { final PortDTO portDTO = new PortDTO(); portDTO.setId(getString(element, "id")); + portDTO.setVersionedComponentId(getString(element, "versionedComponentId")); portDTO.setPosition(getPosition(DomUtils.getChild(element, "position"))); portDTO.setName(getString(element, "name")); portDTO.setComments(getString(element, "comments")); @@ -370,6 +394,7 @@ public class FlowFromDOMFactory { final String targetId = getString(element, "targetId"); descriptor.setTargetId(targetId == null ? id : targetId); + descriptor.setVersionedComponentId(getString(element, "versionedComponentId")); descriptor.setName(getString(element, "name")); descriptor.setComments(getString(element, "comments")); descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks")); @@ -386,6 +411,7 @@ public class FlowFromDOMFactory { final ProcessorDTO dto = new ProcessorDTO(); dto.setId(getString(element, "id")); + dto.setVersionedComponentId(getString(element, "versionedComponentId")); dto.setName(getString(element, "name")); dto.setType(getString(element, "class")); dto.setBundle(getBundle(DomUtils.getChild(element, "bundle"))); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index bc28a25..ecf2438 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -39,6 +39,7 @@ import org.apache.nifi.persistence.TemplateSerializer; import org.apache.nifi.processor.Relationship; import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.util.CharacterFilterUtils; @@ -63,6 +64,7 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.OutputStream; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; /** @@ -151,10 +153,21 @@ public class StandardFlowSerializer implements FlowSerializer { final Element element = doc.createElement(elementName); parentElement.appendChild(element); addTextElement(element, "id", group.getIdentifier()); + addTextElement(element, "versionedComponentId", group.getVersionedComponentId()); addTextElement(element, "name", group.getName()); addPosition(element, group.getPosition()); addTextElement(element, "comment", group.getComments()); + final VersionControlInformation versionControlInfo = group.getVersionControlInformation(); + if (versionControlInfo != null) { + final Element versionControlInfoElement = doc.createElement("versionControlInformation"); + addTextElement(versionControlInfoElement, "registryId", versionControlInfo.getRegistryIdentifier()); + addTextElement(versionControlInfoElement, "bucketId", versionControlInfo.getBucketIdentifier()); + addTextElement(versionControlInfoElement, "flowId", versionControlInfo.getFlowIdentifier()); + addTextElement(versionControlInfoElement, "version", versionControlInfo.getVersion()); + element.appendChild(versionControlInfoElement); + } + for (final ProcessorNode processor : group.getProcessors()) { addProcessor(element, processor, scheduledStateLookup); } @@ -258,6 +271,7 @@ public class StandardFlowSerializer implements FlowSerializer { final Element element = doc.createElement("label"); parentElement.appendChild(element); addTextElement(element, "id", label.getIdentifier()); + addTextElement(element, "versionedComponentId", label.getVersionedComponentId()); addPosition(element, label.getPosition()); addSize(element, label.getSize()); @@ -272,6 +286,7 @@ public class StandardFlowSerializer implements FlowSerializer { final Element element = doc.createElement("funnel"); parentElement.appendChild(element); addTextElement(element, "id", funnel.getIdentifier()); + addTextElement(element, "versionedComponentId", funnel.getVersionedComponentId()); addPosition(element, funnel.getPosition()); } @@ -280,6 +295,7 @@ public class StandardFlowSerializer implements FlowSerializer { final Element element = doc.createElement("remoteProcessGroup"); parentElement.appendChild(element); addTextElement(element, "id", remoteRef.getIdentifier()); + addTextElement(element, "versionedComponentId", remoteRef.getVersionedComponentId()); addTextElement(element, "name", remoteRef.getName()); addPosition(element, remoteRef.getPosition()); addTextElement(element, "comment", remoteRef.getComments()); @@ -322,6 +338,7 @@ public class StandardFlowSerializer implements FlowSerializer { final Element element = doc.createElement(elementName); parentElement.appendChild(element); addTextElement(element, "id", port.getIdentifier()); + addTextElement(element, "versionedComponentId", port.getVersionedComponentId()); addTextElement(element, "name", port.getName()); addPosition(element, port.getPosition()); addTextElement(element, "comments", port.getComments()); @@ -350,6 +367,7 @@ public class StandardFlowSerializer implements FlowSerializer { final Element element = doc.createElement(elementName); parentElement.appendChild(element); addTextElement(element, "id", port.getIdentifier()); + addTextElement(element, "versionedComponentId", port.getVersionedComponentId()); addTextElement(element, "name", port.getName()); addPosition(element, port.getPosition()); addTextElement(element, "comments", port.getComments()); @@ -363,6 +381,7 @@ public class StandardFlowSerializer implements FlowSerializer { final Element element = doc.createElement(elementName); parentElement.appendChild(element); addTextElement(element, "id", port.getIdentifier()); + addTextElement(element, "versionedComponentId", port.getVersionedComponentId()); addTextElement(element, "name", port.getName()); addPosition(element, port.getPosition()); addTextElement(element, "comments", port.getComments()); @@ -383,6 +402,7 @@ public class StandardFlowSerializer implements FlowSerializer { final Element element = doc.createElement("processor"); parentElement.appendChild(element); addTextElement(element, "id", processor.getIdentifier()); + addTextElement(element, "versionedComponentId", processor.getVersionedComponentId()); addTextElement(element, "name", processor.getName()); addPosition(element, processor.getPosition()); @@ -444,6 +464,7 @@ public class StandardFlowSerializer implements FlowSerializer { final Element element = doc.createElement("connection"); parentElement.appendChild(element); addTextElement(element, "id", connection.getIdentifier()); + addTextElement(element, "versionedComponentId", connection.getVersionedComponentId()); addTextElement(element, "name", connection.getName()); final Element bendPointsElement = doc.createElement("bendPoints"); @@ -500,6 +521,7 @@ public class StandardFlowSerializer implements FlowSerializer { public void addControllerService(final Element element, final ControllerServiceNode serviceNode) { final Element serviceElement = element.getOwnerDocument().createElement("controllerService"); addTextElement(serviceElement, "id", serviceNode.getIdentifier()); + addTextElement(serviceElement, "versionedComponentId", serviceNode.getVersionedComponentId()); addTextElement(serviceElement, "name", serviceNode.getName()); addTextElement(serviceElement, "comment", serviceNode.getComments()); addTextElement(serviceElement, "class", serviceNode.getCanonicalClassName()); @@ -544,6 +566,17 @@ public class StandardFlowSerializer implements FlowSerializer { element.appendChild(toAdd); } + private static void addTextElement(final Element element, final String name, final Optional<String> value) { + if (!value.isPresent()) { + return; + } + + final Document doc = element.getOwnerDocument(); + final Element toAdd = doc.createElement(name); + toAdd.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(value.get())); // value should already be filtered, but just in case ensure there are no invalid xml characters + element.appendChild(toAdd); + } + public static void addTemplate(final Element element, final Template template) { try { final byte[] serialized = TemplateSerializer.serialize(template.getDetails()); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 3faffd7..633f0ed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -198,6 +198,7 @@ public class ControllerServiceLoader { final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), coordinate, Collections.emptySet(), false); node.setName(dto.getName()); node.setComments(dto.getComments()); + node.setVersionedComponentId(dto.getVersionedComponentId()); return node; } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index f46f796..53fd166 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -72,6 +73,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i private final AtomicReference<ControllerServiceDetails> controllerServiceHolder = new AtomicReference<>(null); private final ControllerServiceProvider serviceProvider; private final ServiceStateTransition stateTransition = new ServiceStateTransition(); + private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -526,4 +528,26 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i return results != null ? results : Collections.emptySet(); } + @Override + public Optional<String> getVersionedComponentId() { + return Optional.ofNullable(versionedComponentId.get()); + } + + @Override + public void setVersionedComponentId(final String versionedComponentId) { + boolean updated = false; + while (!updated) { + final String currentId = this.versionedComponentId.get(); + + if (currentId == null) { + updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); + } else if (currentId.equals(versionedComponentId)) { + return; + } else if (versionedComponentId == null) { + updated = this.versionedComponentId.compareAndSet(currentId, null); + } else { + throw new IllegalStateException(this + " is already under version control"); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 7c68475..3aa5084 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -268,6 +268,17 @@ public class FingerprintFactory { private StringBuilder addProcessGroupFingerprint(final StringBuilder builder, final Element processGroupElem, final FlowController controller) throws FingerprintException { // id appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "versionedComponentId")); + + final Element versionControlInfo = DomUtils.getChild(processGroupElem, "versionControlInformation"); + if (versionControlInfo == null) { + builder.append("NO_VERSION_CONTROL_INFORMATION"); + } else { + appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "registryId")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "bucketId")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "flowId")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "version")); + } // processors final List<Element> processorElems = DomUtils.getChildElementsByTagName(processGroupElem, "processor"); @@ -344,6 +355,7 @@ public class FingerprintFactory { private StringBuilder addFlowFileProcessorFingerprint(final StringBuilder builder, final Element processorElem) throws FingerprintException { // id appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "id")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "versionedComponentId")); // class final NodeList childNodes = DomUtils.getChildNodesByTagName(processorElem, "class"); final String className = childNodes.item(0).getTextContent(); @@ -435,6 +447,7 @@ public class FingerprintFactory { private StringBuilder addPortFingerprint(final StringBuilder builder, final Element portElem) throws FingerprintException { // id appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "id")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "versionedComponentId")); appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "name")); final NodeList userAccessControlNodeList = DomUtils.getChildNodesByTagName(portElem, "userAccessControl"); @@ -471,13 +484,14 @@ public class FingerprintFactory { private StringBuilder addLabelFingerprint(final StringBuilder builder, final Element labelElem) { appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "id")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "versionedComponentId")); appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "value")); return builder; } private StringBuilder addRemoteProcessGroupFingerprint(final StringBuilder builder, final Element remoteProcessGroupElem) throws FingerprintException { - for (String tagName : new String[]{"id", "urls", "networkInterface", "timeout", "yieldPeriod", + for (String tagName : new String[] {"id", "versionedComponentId", "urls", "networkInterface", "timeout", "yieldPeriod", "transportProtocol", "proxyHost", "proxyPort", "proxyUser", "proxyPassword"}) { final String value = getFirstValue(DomUtils.getChildNodesByTagName(remoteProcessGroupElem, tagName)); if (isEncrypted(value)) { @@ -544,7 +558,7 @@ public class FingerprintFactory { } private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) { - for (final String childName : new String[] {"id", "targetId", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) { + for (final String childName : new String[] {"id", "targetId", "versionedComponentId", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) { appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName)); } @@ -555,6 +569,7 @@ public class FingerprintFactory { private StringBuilder addConnectionFingerprint(final StringBuilder builder, final Element connectionElem) throws FingerprintException { // id appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "id")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "versionedComponentId")); // source id appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "sourceId")); // source group id @@ -583,11 +598,13 @@ public class FingerprintFactory { private StringBuilder addFunnelFingerprint(final StringBuilder builder, final Element funnelElem) throws FingerprintException { // id appendFirstValue(builder, DomUtils.getChildNodesByTagName(funnelElem, "id")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(funnelElem, "versionedComponentId")); return builder; } private void addControllerServiceFingerprint(final StringBuilder builder, final ControllerServiceDTO dto) { builder.append(dto.getId()); + builder.append(dto.getVersionedComponentId()); builder.append(dto.getType()); builder.append(dto.getName());
