Repository: nifi-registry Updated Branches: refs/heads/master 589253778 -> a2f639f37
NIFIREG-57: Added EvolvingDifferenceDescriptor vs. StaticDifferenceDescriptor This closes #42. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-registry/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-registry/commit/a2f639f3 Tree: http://git-wip-us.apache.org/repos/asf/nifi-registry/tree/a2f639f3 Diff: http://git-wip-us.apache.org/repos/asf/nifi-registry/diff/a2f639f3 Branch: refs/heads/master Commit: a2f639f37819b464794fe4f790731026d34c0c55 Parents: 5892537 Author: Mark Payne <[email protected]> Authored: Tue Nov 21 14:40:00 2017 -0500 Committer: Bryan Bende <[email protected]> Committed: Mon Nov 27 18:02:13 2017 -0500 ---------------------------------------------------------------------- .../registry/flow/VersionedFlowCoordinates.java | 15 ++ .../registry/flow/VersionedRemoteGroupPort.java | 23 ++- .../ConciseEvolvingDifferenceDescriptor.java | 79 +++++++++++ .../flow/diff/EvolvingDifferenceDescriptor.java | 6 + .../flow/diff/StandardFlowComparator.java | 140 +++++++------------ .../flow/diff/StaticDifferenceDescriptor.java | 29 ++++ 6 files changed, 200 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowCoordinates.java ---------------------------------------------------------------------- diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowCoordinates.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowCoordinates.java index ac98933..8e39c5b 100644 --- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowCoordinates.java +++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowCoordinates.java @@ -26,6 +26,7 @@ public class VersionedFlowCoordinates { private String bucketId; private String flowId; private int version; + private Boolean latest; @ApiModelProperty("The URL of the Flow Registry that contains the flow") public String getRegistryUrl() { @@ -63,6 +64,15 @@ public class VersionedFlowCoordinates { this.version = version; } + @ApiModelProperty("Whether or not these coordinates point to the latest version of the flow") + public Boolean getLatest() { + return latest; + } + + public void setLatest(Boolean latest) { + this.latest = latest; + } + @Override public int hashCode() { return Objects.hash(registryUrl, bucketId, flowId, version); @@ -83,4 +93,9 @@ public class VersionedFlowCoordinates { final VersionedFlowCoordinates other = (VersionedFlowCoordinates) obj; return Objects.equals(registryUrl, other.registryUrl) && Objects.equals(bucketId, other.bucketId) && Objects.equals(flowId, other.flowId) && Objects.equals(version, other.version); } + + @Override + public String toString() { + return "VersionedFlowCoordinates[bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version + ", registryUrl=" + registryUrl + "]"; + } } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedRemoteGroupPort.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedRemoteGroupPort.java index 73b037e..ca85ce4 100644 --- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedRemoteGroupPort.java +++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedRemoteGroupPort.java @@ -22,11 +22,12 @@ import java.util.Objects; import io.swagger.annotations.ApiModelProperty; public class VersionedRemoteGroupPort extends VersionedComponent { - private String groupId; + private String remoteGroupId; private Integer concurrentlySchedulableTaskCount; private Boolean useCompression; private BatchSize batchSize; private ComponentType componentType; + private String targetId; @ApiModelProperty("The number of task that may transmit flowfiles to the target port concurrently.") public Integer getConcurrentlySchedulableTaskCount() { @@ -38,12 +39,12 @@ public class VersionedRemoteGroupPort extends VersionedComponent { } @ApiModelProperty("The id of the remote process group that the port resides in.") - public String getGroupId() { - return groupId; + public String getRemoteGroupId() { + return remoteGroupId; } - public void setGroupId(String groupId) { - this.groupId = groupId; + public void setRemoteGroupId(String groupId) { + this.remoteGroupId = groupId; } @@ -61,10 +62,19 @@ public class VersionedRemoteGroupPort extends VersionedComponent { return batchSize; } - public void setBatchSettings(BatchSize batchSize) { + public void setBatchSize(BatchSize batchSize) { this.batchSize = batchSize; } + @ApiModelProperty("The ID of the port on the target NiFi instance") + public String getTargetId() { + return targetId; + } + + public void setTargetId(final String targetId) { + this.targetId = targetId; + } + @Override public int hashCode() { return 923847 + String.valueOf(getName()).hashCode(); @@ -88,6 +98,7 @@ public class VersionedRemoteGroupPort extends VersionedComponent { return componentType; } + @Override public void setComponentType(final ComponentType componentType) { if (componentType != ComponentType.REMOTE_INPUT_PORT && componentType != ComponentType.REMOTE_OUTPUT_PORT) { throw new IllegalArgumentException(); http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java new file mode 100644 index 0000000..eb8c874 --- /dev/null +++ b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import java.util.Objects; + +import org.apache.nifi.registry.flow.VersionedComponent; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; + +/** + * Describes differences between flows as if Flow A is an 'earlier version' of the same flow than Flow B. + * This provides verbiage such as "Processor with ID 123 was added to flow." + */ +public class ConciseEvolvingDifferenceDescriptor implements DifferenceDescriptor { + + @Override + public String describeDifference(final DifferenceType type, final String flowAName, final String flowBName, final VersionedComponent componentA, + final VersionedComponent componentB, final Object valueA, final Object valueB) { + + final String description; + switch (type) { + case COMPONENT_ADDED: + description = String.format("%s was added", componentB.getComponentType().getTypeName()); + break; + case COMPONENT_REMOVED: + description = String.format("%s was removed", componentA.getComponentType().getTypeName()); + break; + case PROPERTY_ADDED: + description = String.format("Property '%s' was added", valueB); + break; + case PROPERTY_REMOVED: + description = String.format("Property '%s' was removed", valueA); + break; + case VARIABLE_ADDED: + description = String.format("Variable '%s' was added", valueB); + break; + case VARIABLE_REMOVED: + description = String.format("Variable '%s' was removed", valueA); + break; + case VERSIONED_FLOW_COORDINATES_CHANGED: + if (valueA instanceof VersionedFlowCoordinates && valueB instanceof VersionedFlowCoordinates) { + final VersionedFlowCoordinates coordinatesA = (VersionedFlowCoordinates) valueA; + final VersionedFlowCoordinates coordinatesB = (VersionedFlowCoordinates) valueB; + + // If the two vary only by version, then use a more concise message. If anything else is different, then use a fully explanation. + if (Objects.equals(coordinatesA.getRegistryUrl(), coordinatesB.getRegistryUrl()) && Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId()) + && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && coordinatesA.getVersion() != coordinatesB.getVersion()) { + + description = String.format("Flow Version changed from %s to %s", coordinatesA.getVersion(), coordinatesB.getVersion()); + break; + } + } + + description = String.format("From '%s' to '%s'", valueA, valueB); + break; + default: + description = String.format("From '%s' to '%s'", valueA, valueB); + break; + } + + return description; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/EvolvingDifferenceDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/EvolvingDifferenceDescriptor.java b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/EvolvingDifferenceDescriptor.java index d1309b4..a4d1a65 100644 --- a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/EvolvingDifferenceDescriptor.java +++ b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/EvolvingDifferenceDescriptor.java @@ -43,6 +43,12 @@ public class EvolvingDifferenceDescriptor implements DifferenceDescriptor { case PROPERTY_REMOVED: description = String.format("Property '%s' was removed from %s with ID %s", valueA, componentA.getComponentType().getTypeName(), componentA.getIdentifier()); break; + case VARIABLE_ADDED: + description = String.format("Variable '%s' was added to Process Group with ID %s", valueB, componentB.getIdentifier()); + break; + case VARIABLE_REMOVED: + description = String.format("Variable '%s' was removed from Process Group with ID %s", valueA, componentA.getIdentifier()); + break; default: description = String.format("%s for %s with ID %s from '%s' to '%s'", type.getDescription(), componentA.getComponentType().getTypeName(), componentA.getIdentifier(), valueA, valueB); http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java ---------------------------------------------------------------------- diff --git a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java index a54dab1..260909e 100644 --- a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java +++ b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java @@ -99,10 +99,11 @@ public class StandardFlowComparator implements FlowComparator { private boolean compareComponents(final VersionedComponent componentA, final VersionedComponent componentB, final Set<FlowDifference> differences) { - return compareComponents(componentA, componentB, differences, true); + return compareComponents(componentA, componentB, differences, true, true); } - private boolean compareComponents(final VersionedComponent componentA, final VersionedComponent componentB, final Set<FlowDifference> differences, final boolean compareNamePos) { + private boolean compareComponents(final VersionedComponent componentA, final VersionedComponent componentB, final Set<FlowDifference> differences, final boolean compareNamePos, + final boolean compareComments) { if (componentA == null) { differences.add(difference(DifferenceType.COMPONENT_ADDED, componentA, componentB, componentA, componentB)); return true; @@ -113,11 +114,13 @@ public class StandardFlowComparator implements FlowComparator { return true; } - addIfDifferent(differences, DifferenceType.COMMENTS_CHANGED, componentA, componentB, c -> c.getComments()); + if (compareComments) { + addIfDifferent(differences, DifferenceType.COMMENTS_CHANGED, componentA, componentB, VersionedComponent::getComments); + } if (compareNamePos) { - addIfDifferent(differences, DifferenceType.NAME_CHANGED, componentA, componentB, c -> c.getName()); - addIfDifferent(differences, DifferenceType.POSITION_CHANGED, componentA, componentB, c -> c.getPosition()); + addIfDifferent(differences, DifferenceType.NAME_CHANGED, componentA, componentB, VersionedComponent::getName); + addIfDifferent(differences, DifferenceType.POSITION_CHANGED, componentA, componentB, VersionedComponent::getPosition); } return false; @@ -128,18 +131,18 @@ public class StandardFlowComparator implements FlowComparator { return; } - addIfDifferent(differences, DifferenceType.ANNOTATION_DATA_CHANGED, processorA, processorB, p -> p.getAnnotationData()); - addIfDifferent(differences, DifferenceType.AUTO_TERMINATED_RELATIONSHIPS_CHANGED, processorA, processorB, p -> p.getAutoTerminatedRelationships()); - addIfDifferent(differences, DifferenceType.BULLETIN_LEVEL_CHANGED, processorA, processorB, p -> p.getBulletinLevel()); - addIfDifferent(differences, DifferenceType.BUNDLE_CHANGED, processorA, processorB, p -> p.getBundle()); - addIfDifferent(differences, DifferenceType.CONCURRENT_TASKS_CHANGED, processorA, processorB, p -> p.getConcurrentlySchedulableTaskCount()); - addIfDifferent(differences, DifferenceType.EXECUTION_MODE_CHANGED, processorA, processorB, p -> p.getExecutionNode()); - addIfDifferent(differences, DifferenceType.PENALTY_DURATION_CHANGED, processorA, processorB, p -> p.getPenaltyDuration()); - addIfDifferent(differences, DifferenceType.RUN_DURATION_CHANGED, processorA, processorB, p -> p.getRunDurationMillis()); - addIfDifferent(differences, DifferenceType.SCHEDULING_PERIOD_CHANGED, processorA, processorB, p -> p.getSchedulingPeriod()); - addIfDifferent(differences, DifferenceType.SCHEDULING_STRATEGY_CHANGED, processorA, processorB, p -> p.getSchedulingStrategy()); - addIfDifferent(differences, DifferenceType.STYLE_CHANGED, processorA, processorB, p -> p.getStyle()); - addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, processorA, processorB, p -> p.getYieldDuration()); + addIfDifferent(differences, DifferenceType.ANNOTATION_DATA_CHANGED, processorA, processorB, VersionedProcessor::getAnnotationData); + addIfDifferent(differences, DifferenceType.AUTO_TERMINATED_RELATIONSHIPS_CHANGED, processorA, processorB, VersionedProcessor::getAutoTerminatedRelationships); + addIfDifferent(differences, DifferenceType.BULLETIN_LEVEL_CHANGED, processorA, processorB, VersionedProcessor::getBulletinLevel); + addIfDifferent(differences, DifferenceType.BUNDLE_CHANGED, processorA, processorB, VersionedProcessor::getBundle); + addIfDifferent(differences, DifferenceType.CONCURRENT_TASKS_CHANGED, processorA, processorB, VersionedProcessor::getConcurrentlySchedulableTaskCount); + addIfDifferent(differences, DifferenceType.EXECUTION_MODE_CHANGED, processorA, processorB, VersionedProcessor::getExecutionNode); + addIfDifferent(differences, DifferenceType.PENALTY_DURATION_CHANGED, processorA, processorB, VersionedProcessor::getPenaltyDuration); + addIfDifferent(differences, DifferenceType.RUN_DURATION_CHANGED, processorA, processorB, VersionedProcessor::getRunDurationMillis); + addIfDifferent(differences, DifferenceType.SCHEDULING_PERIOD_CHANGED, processorA, processorB, VersionedProcessor::getSchedulingPeriod); + addIfDifferent(differences, DifferenceType.SCHEDULING_STRATEGY_CHANGED, processorA, processorB, VersionedProcessor::getSchedulingStrategy); + addIfDifferent(differences, DifferenceType.STYLE_CHANGED, processorA, processorB, VersionedProcessor::getStyle); + addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, processorA, processorB, VersionedProcessor::getYieldDuration); compareProperties(processorA, processorB, processorA.getProperties(), processorB.getProperties(), differences); } @@ -148,8 +151,8 @@ public class StandardFlowComparator implements FlowComparator { return; } - addIfDifferent(differences, DifferenceType.ANNOTATION_DATA_CHANGED, serviceA, serviceB, s -> s.getAnnotationData()); - addIfDifferent(differences, DifferenceType.BUNDLE_CHANGED, serviceA, serviceB, s -> s.getBundle()); + addIfDifferent(differences, DifferenceType.ANNOTATION_DATA_CHANGED, serviceA, serviceB, VersionedControllerService::getAnnotationData); + addIfDifferent(differences, DifferenceType.BUNDLE_CHANGED, serviceA, serviceB, VersionedControllerService::getBundle); compareProperties(serviceA, serviceB, serviceA.getProperties(), serviceB.getProperties(), differences); } @@ -183,39 +186,6 @@ public class StandardFlowComparator implements FlowComparator { }); } - private void compareVariables(final VersionedProcessGroup groupA, final VersionedProcessGroup groupB, final Set<FlowDifference> differences) { - - final Map<String, String> variablesA = groupA.getVariables(); - final Map<String, String> variablesB = groupB.getVariables(); - - if (variablesA != null) { - variablesA.entrySet().stream() - .forEach(entry -> { - final String valueA = entry.getValue(); - final String valueB = variablesB.get(entry.getKey()); - - if (valueA == null && valueB != null) { - differences.add(difference(DifferenceType.VARIABLE_ADDED, groupA, groupB, entry.getKey(), entry.getKey())); - } else if (valueA != null && valueB == null) { - differences.add(difference(DifferenceType.VARIABLE_REMOVED, groupA, groupB, entry.getKey(), entry.getKey())); - } - }); - } - - if (variablesB != null) { - variablesB.entrySet().stream() - .forEach(entry -> { - final String valueA = variablesA.get(entry.getKey()); - final String valueB = entry.getValue(); - - // If there are any properties for component B that do not exist for Component A, add those as differences as well. - if (valueA == null && valueB != null) { - differences.add(difference(DifferenceType.VARIABLE_ADDED, groupA, groupB, entry.getKey(), entry.getKey())); - } - }); - } - } - private void compare(final VersionedFunnel funnelA, final VersionedFunnel funnelB, final Set<FlowDifference> differences) { if (compareComponents(funnelA, funnelB, differences)) { @@ -228,10 +198,10 @@ public class StandardFlowComparator implements FlowComparator { return; } - addIfDifferent(differences, DifferenceType.LABEL_VALUE_CHANGED, labelA, labelB, l -> l.getLabel()); - addIfDifferent(differences, DifferenceType.POSITION_CHANGED, labelA, labelB, l -> l.getHeight()); - addIfDifferent(differences, DifferenceType.POSITION_CHANGED, labelA, labelB, l -> l.getWidth()); - addIfDifferent(differences, DifferenceType.STYLE_CHANGED, labelA, labelB, l -> l.getStyle()); + addIfDifferent(differences, DifferenceType.LABEL_VALUE_CHANGED, labelA, labelB, VersionedLabel::getLabel); + addIfDifferent(differences, DifferenceType.POSITION_CHANGED, labelA, labelB, VersionedLabel::getHeight); + addIfDifferent(differences, DifferenceType.POSITION_CHANGED, labelA, labelB, VersionedLabel::getWidth); + addIfDifferent(differences, DifferenceType.STYLE_CHANGED, labelA, labelB, VersionedLabel::getStyle); } private void compare(final VersionedPort portA, final VersionedPort portB, final Set<FlowDifference> differences) { @@ -241,17 +211,17 @@ public class StandardFlowComparator implements FlowComparator { } private void compare(final VersionedRemoteProcessGroup rpgA, final VersionedRemoteProcessGroup rpgB, final Set<FlowDifference> differences) { - if (compareComponents(rpgA, rpgB, differences)) { + if (compareComponents(rpgA, rpgB, differences, true, false)) { // do not compare comments for RPG because they come from remote system, not our local flow return; } - addIfDifferent(differences, DifferenceType.RPG_COMMS_TIMEOUT_CHANGED, rpgA, rpgB, r -> r.getCommunicationsTimeout()); - addIfDifferent(differences, DifferenceType.RPG_NETWORK_INTERFACE_CHANGED, rpgA, rpgB, r -> rpgA.getLocalNetworkInterface()); - addIfDifferent(differences, DifferenceType.RPG_PROXY_HOST_CHANGED, rpgA, rpgB, r -> rpgA.getProxyHost()); - addIfDifferent(differences, DifferenceType.RPG_PROXY_PORT_CHANGED, rpgA, rpgB, r -> rpgA.getProxyPort()); - addIfDifferent(differences, DifferenceType.RPG_PROXY_USER_CHANGED, rpgA, rpgB, r -> rpgA.getProxyUser()); - addIfDifferent(differences, DifferenceType.RPG_TRANSPORT_PROTOCOL_CHANGED, rpgA, rpgB, r -> rpgA.getTransportProtocol()); - addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, rpgA, rpgB, r -> rpgA.getYieldDuration()); + addIfDifferent(differences, DifferenceType.RPG_COMMS_TIMEOUT_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getCommunicationsTimeout); + addIfDifferent(differences, DifferenceType.RPG_NETWORK_INTERFACE_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getLocalNetworkInterface); + addIfDifferent(differences, DifferenceType.RPG_PROXY_HOST_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getProxyHost); + addIfDifferent(differences, DifferenceType.RPG_PROXY_PORT_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getProxyPort); + addIfDifferent(differences, DifferenceType.RPG_PROXY_USER_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getProxyUser); + addIfDifferent(differences, DifferenceType.RPG_TRANSPORT_PROTOCOL_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getTransportProtocol); + addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getYieldDuration); differences.addAll(compareComponents(rpgA.getInputPorts(), rpgB.getInputPorts(), (a, b, diffs) -> compare(a, b, diffs))); differences.addAll(compareComponents(rpgA.getOutputPorts(), rpgB.getOutputPorts(), (a, b, diffs) -> compare(a, b, diffs))); @@ -262,14 +232,14 @@ public class StandardFlowComparator implements FlowComparator { return; } - addIfDifferent(differences, DifferenceType.REMOTE_PORT_BATCH_SIZE_CHANGED, portA, portB, p -> p.getBatchSize()); - addIfDifferent(differences, DifferenceType.REMOTE_PORT_COMPRESSION_CHANGED, portA, portB, p -> p.isUseCompression()); - addIfDifferent(differences, DifferenceType.CONCURRENT_TASKS_CHANGED, portA, portB, p -> p.getConcurrentlySchedulableTaskCount()); + addIfDifferent(differences, DifferenceType.REMOTE_PORT_BATCH_SIZE_CHANGED, portA, portB, VersionedRemoteGroupPort::getBatchSize); + addIfDifferent(differences, DifferenceType.REMOTE_PORT_COMPRESSION_CHANGED, portA, portB, VersionedRemoteGroupPort::isUseCompression); + addIfDifferent(differences, DifferenceType.CONCURRENT_TASKS_CHANGED, portA, portB, VersionedRemoteGroupPort::getConcurrentlySchedulableTaskCount); } private void compare(final VersionedProcessGroup groupA, final VersionedProcessGroup groupB, final Set<FlowDifference> differences, final boolean compareNamePos) { - if (compareComponents(groupA, groupB, differences, compareNamePos)) { + if (compareComponents(groupA, groupB, differences, compareNamePos, true)) { return; } @@ -283,19 +253,17 @@ public class StandardFlowComparator implements FlowComparator { return; } - addIfDifferent(differences, DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED, groupA, groupB, g -> g.getVersionedFlowCoordinates()); + addIfDifferent(differences, DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED, groupA, groupB, VersionedProcessGroup::getVersionedFlowCoordinates); - differences.addAll(compareComponents(groupA.getConnections(), groupB.getConnections(), (a, b, diffs) -> compare(a, b, diffs))); - differences.addAll(compareComponents(groupA.getProcessors(), groupB.getProcessors(), (a, b, diffs) -> compare(a, b, diffs))); - differences.addAll(compareComponents(groupA.getControllerServices(), groupB.getControllerServices(), (a, b, diffs) -> compare(a, b, diffs))); - differences.addAll(compareComponents(groupA.getFunnels(), groupB.getFunnels(), (a, b, diffs) -> compare(a, b, diffs))); - differences.addAll(compareComponents(groupA.getInputPorts(), groupB.getInputPorts(), (a, b, diffs) -> compare(a, b, diffs))); - differences.addAll(compareComponents(groupA.getLabels(), groupB.getLabels(), (a, b, diffs) -> compare(a, b, diffs))); - differences.addAll(compareComponents(groupA.getOutputPorts(), groupB.getOutputPorts(), (a, b, diffs) -> compare(a, b, diffs))); + differences.addAll(compareComponents(groupA.getConnections(), groupB.getConnections(), this::compare)); + differences.addAll(compareComponents(groupA.getProcessors(), groupB.getProcessors(), this::compare)); + differences.addAll(compareComponents(groupA.getControllerServices(), groupB.getControllerServices(), this::compare)); + differences.addAll(compareComponents(groupA.getFunnels(), groupB.getFunnels(), this::compare)); + differences.addAll(compareComponents(groupA.getInputPorts(), groupB.getInputPorts(), this::compare)); + differences.addAll(compareComponents(groupA.getLabels(), groupB.getLabels(), this::compare)); + differences.addAll(compareComponents(groupA.getOutputPorts(), groupB.getOutputPorts(), this::compare)); differences.addAll(compareComponents(groupA.getProcessGroups(), groupB.getProcessGroups(), (a, b, diffs) -> compare(a, b, diffs, true))); - differences.addAll(compareComponents(groupA.getRemoteProcessGroups(), groupB.getRemoteProcessGroups(), (a, b, diffs) -> compare(a, b, diffs))); - - compareVariables(groupA, groupB, differences); + differences.addAll(compareComponents(groupA.getRemoteProcessGroups(), groupB.getRemoteProcessGroups(), this::compare)); } @@ -304,13 +272,13 @@ public class StandardFlowComparator implements FlowComparator { return; } - addIfDifferent(differences, DifferenceType.BACKPRESSURE_DATA_SIZE_THRESHOLD_CHANGED, connectionA, connectionB, c -> c.getBackPressureDataSizeThreshold()); - addIfDifferent(differences, DifferenceType.BACKPRESSURE_OBJECT_THRESHOLD_CHANGED, connectionA, connectionB, c -> c.getBackPressureObjectThreshold()); - addIfDifferent(differences, DifferenceType.BENDPOINTS_CHANGED, connectionA, connectionB, c -> c.getBends()); - addIfDifferent(differences, DifferenceType.DESTINATION_CHANGED, connectionA, connectionB, c -> c.getDestination()); - addIfDifferent(differences, DifferenceType.FLOWFILE_EXPIRATION_CHANGED, connectionA, connectionB, c -> c.getFlowFileExpiration()); - addIfDifferent(differences, DifferenceType.PRIORITIZERS_CHANGED, connectionA, connectionB, c -> c.getPrioritizers()); - addIfDifferent(differences, DifferenceType.SELECTED_RELATIONSHIPS_CHANGED, connectionA, connectionB, c -> c.getSelectedRelationships()); + addIfDifferent(differences, DifferenceType.BACKPRESSURE_DATA_SIZE_THRESHOLD_CHANGED, connectionA, connectionB, VersionedConnection::getBackPressureDataSizeThreshold); + addIfDifferent(differences, DifferenceType.BACKPRESSURE_OBJECT_THRESHOLD_CHANGED, connectionA, connectionB, VersionedConnection::getBackPressureObjectThreshold); + addIfDifferent(differences, DifferenceType.BENDPOINTS_CHANGED, connectionA, connectionB, VersionedConnection::getBends); + addIfDifferent(differences, DifferenceType.DESTINATION_CHANGED, connectionA, connectionB, VersionedConnection::getDestination); + addIfDifferent(differences, DifferenceType.FLOWFILE_EXPIRATION_CHANGED, connectionA, connectionB, VersionedConnection::getFlowFileExpiration); + addIfDifferent(differences, DifferenceType.PRIORITIZERS_CHANGED, connectionA, connectionB, VersionedConnection::getPrioritizers); + addIfDifferent(differences, DifferenceType.SELECTED_RELATIONSHIPS_CHANGED, connectionA, connectionB, VersionedConnection::getSelectedRelationships); addIfDifferent(differences, DifferenceType.SOURCE_CHANGED, connectionA, connectionB, c -> c.getSource().getId()); } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java index 42fe5e3..6894923 100644 --- a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java +++ b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java @@ -17,7 +17,10 @@ package org.apache.nifi.registry.flow.diff; +import java.util.Objects; + import org.apache.nifi.registry.flow.VersionedComponent; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; /** * Describes differences between flows as if the flows are two disparate flows that are being @@ -47,6 +50,32 @@ public class StaticDifferenceDescriptor implements DifferenceDescriptor { description = String.format("Property '%s' exists for %s with ID %s in %s but not in %s", valueA, componentA.getComponentType().getTypeName(), componentA.getIdentifier(), flowAName, flowBName); break; + case VARIABLE_ADDED: + description = String.format("Variable '%s' exists for Process Group with ID %s in %s but not in %s", + valueB, componentB.getIdentifier(), flowBName, flowAName); + break; + case VARIABLE_REMOVED: + description = String.format("Variable '%s' exists for Process Group with ID %s in %s but not in %s", + valueA, componentA.getIdentifier(), flowAName, flowBName); + break; + case VERSIONED_FLOW_COORDINATES_CHANGED: + if (valueA instanceof VersionedFlowCoordinates && valueB instanceof VersionedFlowCoordinates) { + final VersionedFlowCoordinates coordinatesA = (VersionedFlowCoordinates) valueA; + final VersionedFlowCoordinates coordinatesB = (VersionedFlowCoordinates) valueB; + + // If the two vary only by version, then use a more concise message. If anything else is different, then use a fully explanation. + if (Objects.equals(coordinatesA.getRegistryUrl(), coordinatesB.getRegistryUrl()) && Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId()) + && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && coordinatesA.getVersion() != coordinatesB.getVersion()) { + + description = String.format("Flow Version is %s in %s but %s in %s", coordinatesA.getVersion(), flowAName, coordinatesB.getVersion(), flowBName); + break; + } + } + + description = String.format("%s for %s with ID %s; flow '%s' has value %s; flow '%s' has value %s", + type.getDescription(), componentA.getComponentType().getTypeName(), componentA.getIdentifier(), + flowAName, valueA, flowBName, valueB); + break; default: description = String.format("%s for %s with ID %s; flow '%s' has value %s; flow '%s' has value %s", type.getDescription(), componentA.getComponentType().getTypeName(), componentA.getIdentifier(),
