NIFI-1799 Implements auto-scaling flow layout added utility class to scale positions of components on the canvas, extracted get/setPosition methods from ProcesGroup, RemoteProcessGroup, Label, and Connectable into new interface Positionable added interface method for finding all Positionables in a ProcessGroup to the ProcessGroup interface and added implementation to StandardProcessGroup added test flow for position rescaling added Spock config to POM and a spec for testing the scaling of Positionables forced Surefire to use JUnit (TestNG was on classpath and Surefire seems to prioritize that over JUnit), added check in StandardFlowSynchronizer to scale positions only when flow encoding version is less than 1.0 added spec for StandardFlowfileSynchronizer updated FlowConfiguration.xsd to allow encoding-version attribute added new test flow used in StandardFlowSynchronizerSpec
This closes #442. Signed-off-by: Andy LoPresto <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/433db235 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/433db235 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/433db235 Branch: refs/heads/master Commit: 433db2356710597d8ca813737a5256e1102878be Parents: 7923fd0 Author: Jeff Storck <[email protected]> Authored: Tue May 10 16:07:11 2016 -0400 Committer: Andy LoPresto <[email protected]> Committed: Tue May 24 14:26:28 2016 -0700 ---------------------------------------------------------------------- .../apache/nifi/connectable/Connectable.java | 14 +- .../apache/nifi/connectable/Positionable.java | 35 + .../org/apache/nifi/controller/label/Label.java | 12 +- .../org/apache/nifi/groups/ProcessGroup.java | 21 +- .../apache/nifi/groups/RemoteProcessGroup.java | 18 +- .../nifi-framework/nifi-framework-core/pom.xml | 14 +- .../apache/nifi/controller/PositionScaler.java | 72 + .../controller/StandardFlowSynchronizer.java | 9 + .../nifi/groups/StandardProcessGroup.java | 15 + .../src/main/resources/FlowConfiguration.xsd | 3 +- .../nifi/controller/PositionScalerSpec.groovy | 49 + .../StandardFlowSynchronizerSpec.groovy | 244 +++ .../service/mock/MockProcessGroup.java | 18 +- .../conf/scale-positions-flow-0.7.0.xml | 1533 ++++++++++++++++++ pom.xml | 19 + 15 files changed, 2023 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java index d430599..eeebeae 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java @@ -32,7 +32,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy; /** * Represents a connectable component to which or from which data can flow. */ -public interface Connectable extends Triggerable, Authorizable { +public interface Connectable extends Triggerable, Authorizable, Positionable { /** * @return the unique identifier for this <code>Connectable</code> @@ -107,18 +107,6 @@ public interface Connectable extends Triggerable, Authorizable { Set<Connection> getConnections(Relationship relationship); /** - * @return the position on the graph where this Connectable is located - */ - Position getPosition(); - - /** - * Updates this component's position on the graph - * - * @param position new position - */ - void setPosition(Position position); - - /** * @return the name of this Connectable */ String getName(); http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Positionable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Positionable.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Positionable.java new file mode 100644 index 0000000..81a7164 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Positionable.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.connectable; + +/** + * Represents a component that can be positioned with X,Y coordinates on the canvas. + */ +public interface Positionable { + + /** + * @return the position on the graph where this Connectable is located + */ + Position getPosition(); + + /** + * Updates this component's position on the graph + * + * @param position new position + */ + void setPosition(Position position); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java index 2a95f13..352f73f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java @@ -16,20 +16,16 @@ */ package org.apache.nifi.controller.label; -import java.util.Map; - import org.apache.nifi.authorization.resource.Authorizable; -import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Positionable; import org.apache.nifi.connectable.Size; import org.apache.nifi.groups.ProcessGroup; -public interface Label extends Authorizable { - - String getIdentifier(); +import java.util.Map; - Position getPosition(); +public interface Label extends Authorizable, Positionable { - void setPosition(Position position); + String getIdentifier(); Map<String, String> getStyle(); http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 8d026f0..455a3c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -21,7 +21,7 @@ import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; -import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Positionable; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Snippet; @@ -45,7 +45,7 @@ import java.util.function.Predicate; * <p> * MUST BE THREAD-SAFE</p> */ -public interface ProcessGroup extends Authorizable { +public interface ProcessGroup extends Authorizable, Positionable { /** * Predicate for filtering schedulable Processors. @@ -98,17 +98,6 @@ public interface ProcessGroup extends Authorizable { void setName(String name); /** - * Updates the position of where this ProcessGroup is located in the graph - * @param position new position - */ - void setPosition(Position position); - - /** - * @return the position of where this ProcessGroup is located in the graph - */ - Position getPosition(); - - /** * @return the user-set comments about this ProcessGroup, or * <code>null</code> if no comments have been set */ @@ -740,6 +729,12 @@ public interface ProcessGroup extends Authorizable { Connectable findConnectable(String identifier); /** + * @return a Set of all {@link org.apache.nifi.connectable.Positionable}s contained within this + * {@link ProcessGroup} and any child {@link ProcessGroup}s + */ + Set<Positionable> findAllPositionables(); + + /** * Moves all of the components whose ID's are specified within the given * {@link Snippet} from this ProcessGroup into the given destination * ProcessGroup http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index cf0820b..22ccb43 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -16,18 +16,18 @@ */ package org.apache.nifi.groups; -import java.net.URI; -import java.util.Date; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.authorization.resource.Authorizable; -import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Positionable; import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.RemoteGroupPort; -public interface RemoteProcessGroup extends Authorizable { +import java.net.URI; +import java.util.Date; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public interface RemoteProcessGroup extends Authorizable, Positionable { String getIdentifier(); @@ -37,10 +37,6 @@ public interface RemoteProcessGroup extends Authorizable { void setProcessGroup(ProcessGroup group); - void setPosition(Position position); - - Position getPosition(); - String getComments(); void setComments(String comments); http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index ab318d3..90d942c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -13,7 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> @@ -158,6 +159,17 @@ <artifactId>nifi-mock</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.spockframework</groupId> + <artifactId>spock-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>cglib</groupId> + <artifactId>cglib-nodep</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/PositionScaler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/PositionScaler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/PositionScaler.java new file mode 100644 index 0000000..e0bb2c7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/PositionScaler.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Positionable; +import org.apache.nifi.groups.ProcessGroup; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Provides utility to scale the positions of {@link Positionable}s and bend points of {@link Connection}s + * by a given factor. + */ +class PositionScaler { + + /** + * Scales the positions of all {@link Position}s in the given {@link ProcessGroup} by + * the provided factor. This method replaces all {@link Position}s in each {@link Positionable} + * in the {@link ProcessGroup} with a new scaled {@link Position}. + * + * @param processGroup containing the {@link Positionable}s to be scaled + * @param factorX used to scale a {@link Positionable}'s X-coordinate position + * @param factorY used to scale a {@link Positionable}'s Y-coordinate position + */ + public static void scale(ProcessGroup processGroup, double factorX, double factorY) { + processGroup.findAllPositionables().stream().forEach(p -> scale(p, factorX, factorY)); + Map<Connection, List<Position>> bendPointsByConnection = + processGroup.findAllConnections().stream().collect(Collectors.toMap(connection -> connection, Connection::getBendPoints)); + bendPointsByConnection.entrySet().stream() + .forEach(connectionListEntry -> connectionListEntry.getKey().setBendPoints(connectionListEntry.getValue().stream() + .map(p -> scalePosition(p, factorX, factorY)).collect(Collectors.toList()))); + + } + + /** + * Scales the {@link Position} of the given {@link Positionable} by the provided factor. This method + * replaces the {@link Position} in the {@link Positionable} with a new scaled {@link Position}. + * + * @param positionable containing a {@link Position} to scale + * @param factorX used to scale a {@link Positionable}'s X-coordinate position + * @param factorY used to scale a {@link Positionable}'s Y-coordinate position + */ + public static void scale(Positionable positionable, double factorX, double factorY) { + final Position startingPosition = positionable.getPosition(); + final Position scaledPosition = scalePosition(startingPosition, factorX, factorY); + positionable.setPosition(scaledPosition); + } + + private static Position scalePosition(Position position, double factorX, double factorY) { + return new Position(position.getX() * factorX, + position.getY() * factorY); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 254b75c..93b9761 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -277,6 +277,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + scaleRootGroup(rootGroup, encodingVersion); + final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks"); if (reportingTasksElement != null) { final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); @@ -311,6 +313,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + void scaleRootGroup(ProcessGroup rootGroup, FlowEncodingVersion encodingVersion) { + if (encodingVersion == null || encodingVersion.getMajorVersion() < 1) { + // Calculate new Positions if the encoding version of the flow is older than 1.0. + PositionScaler.scale(rootGroup, 1.5, 1.34); + } + } + private static boolean isEmpty(final ProcessGroupDTO dto) { if (dto == null) { return true; http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index d3f5a1a..052679b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.groups; +import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -36,6 +37,7 @@ import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.LocalPort; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Positionable; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; @@ -72,6 +74,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -2140,6 +2143,18 @@ public final class StandardProcessGroup implements ProcessGroup { return true; } + @Override + public Set<Positionable> findAllPositionables() { + Set<Positionable> positionables = Sets.newHashSet(); + positionables.addAll(findAllConnectables(this, true)); + List<ProcessGroup> allProcessGroups = findAllProcessGroups(); + positionables.addAll(allProcessGroups); + positionables.addAll(allProcessGroups.stream().flatMap(processGroup -> processGroup.findAllPositionables().stream()).collect(Collectors.toSet())); + positionables.addAll(findAllRemoteProcessGroups()); + positionables.addAll(findAllLabels()); + return positionables; + } + private Set<Connectable> findAllConnectables(final ProcessGroup group, final boolean includeRemotePorts) { final Set<Connectable> set = new HashSet<>(); set.addAll(group.getInputPorts()); http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 0f34553..c1e66e5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -15,7 +15,7 @@ --> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" version="1.0"> <xs:element name="flowController" type="FlowControllerType" /> - + <xs:complexType name="FlowControllerType"> <xs:sequence> <xs:choice> @@ -35,6 +35,7 @@ <xs:element name="reportingTasks" type="ReportingTasksType" minOccurs="0" maxOccurs="1" /> </xs:sequence> + <xs:attribute name="encoding-version" type="xs:string"/> </xs:complexType> <!-- the processor "id" is a key that should be valid within each flowController--> http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/PositionScalerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/PositionScalerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/PositionScalerSpec.groovy new file mode 100644 index 0000000..c7e0dd8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/PositionScalerSpec.groovy @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller + +import org.apache.nifi.connectable.Connectable +import org.apache.nifi.connectable.Position +import org.apache.nifi.connectable.Positionable +import spock.lang.Specification +import spock.lang.Unroll + +class PositionScalerSpec extends Specification { + + @Unroll + def "scale #positionableType.getSimpleName()"() { + given: + def positionable = Mock positionableType + + when: + PositionScaler.scale positionable, factorX, factorY + + then: + 1 * positionable.position >> new Position(originalX, originalY) + 1 * positionable.setPosition(_) >> { Position p -> + assert p.x == newX + assert p.y == newY + } + + where: + positionableType | originalX | originalY | factorX | factorY | newX | newY + Connectable | 10 | 10 | 1.5 | 1.5 | 15 | 15 + Positionable | -10 | -10 | 1.5 | 1.5 | -15 | -15 + } + + //TODO Test scaling of a ProcessGroup +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy new file mode 100644 index 0000000..c095bc8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller + +import groovy.xml.XmlUtil +import org.apache.nifi.cluster.protocol.DataFlow +import org.apache.nifi.connectable.* +import org.apache.nifi.controller.label.Label +import org.apache.nifi.controller.queue.FlowFileQueue +import org.apache.nifi.groups.ProcessGroup +import org.apache.nifi.groups.RemoteProcessGroup +import org.apache.nifi.processor.Relationship +import org.apache.nifi.reporting.BulletinRepository +import org.apache.nifi.util.NiFiProperties +import spock.lang.Specification +import spock.lang.Unroll + +class StandardFlowSynchronizerSpec extends Specification { + + def setupSpec() { + System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties" + } + + def teardownSpec() { + System.clearProperty NiFiProperties.PROPERTIES_FILE_PATH + } + + @Unroll + def "scaling of #filename with encoding version \"#flowEncodingVersion\""() { + given: "a StandardFlowSynchronizer with mocked collaborators" + def controller = Mock FlowController + def proposedFlow = Mock DataFlow + def snippetManager = Mock SnippetManager + def bulletinRepository = Mock BulletinRepository + def flowFileQueue = Mock FlowFileQueue + def flowFile = new File(StandardFlowSynchronizerSpec.getResource(filename).toURI()) + def flowControllerXml = new XmlSlurper().parse(flowFile) + def Map<String, Position> originalPositionablePositionsById = flowControllerXml.rootGroup.'**' + .findAll { !it.name().equals('connection') && it.id.size() == 1 && it.position.size() == 1 } + .collectEntries { [it.id.text(), new Position([email protected](), [email protected]())] } + def Map<String, List<Position>> originalBendPointsByConnectionId = flowControllerXml.rootGroup.'**' + .findAll { it.name().equals('connection') && it.bendPoints.size() > 0 } + .collectEntries { [it.id.text(), it.bendPoints.children().collect { new Position([email protected](), [email protected]()) }] } + flowControllerXml.@'encoding-version' = flowEncodingVersion + def testFlowBytes = XmlUtil.serialize(flowControllerXml).bytes + def Map<String, Position> positionablePositionsById = [:] + def Map<String, Positionable> positionableMocksById = [:] + def Map<String, Connection> connectionMocksById = [:] + def Map<String, List<Position>> bendPointPositionsByConnectionId = [:] + // the unit under test + def flowSynchronizer = new StandardFlowSynchronizer(null) + + when: "the flow is synchronized with the current state of the controller" + flowSynchronizer.sync controller, proposedFlow, null + + then: "establish interactions for the mocked collaborators of StandardFlowSynchronizer to store the ending positions of components" + 1 * controller.isInitialized() >> false + _ * controller.rootGroupId >> flowControllerXml.rootGroup.id.text() + _ * controller.getGroup(_) >> { String id -> positionableMocksById.get(id) } + _ * controller.snippetManager >> snippetManager + _ * controller.bulletinRepository >> bulletinRepository + _ * controller./set.*/(*_) + _ * controller.createProcessGroup(_) >> { String pgId -> + def processGroup = Mock(ProcessGroup) + _ * processGroup.getIdentifier() >> pgId + _ * processGroup.getPosition() >> { positionablePositionsById.get(pgId) } + _ * processGroup.setPosition(_) >> { Position pos -> + positionablePositionsById.put pgId, pos + } + _ * processGroup./(add|set).*/(*_) + _ * processGroup.isEmpty() >> true + _ * processGroup.isRootGroup() >> { pgId == flowControllerXml.rootGroup.id } + _ * processGroup.getConnectable(_) >> { String connId -> positionableMocksById.get(connId) } + _ * processGroup.findAllPositionables() >> { + def foundProcessGroup = flowControllerXml.rootGroup.'**'.find { it.id == pgId } + def idsUnderPg = foundProcessGroup.'**'.findAll { it.name() == 'id' }.collect { it.text() } + positionableMocksById.entrySet().collect { + if (idsUnderPg.contains(it.key)) { + it.value + } + } + } + _ * processGroup.findAllConnections() >> { + def foundProcessGroup = flowControllerXml.rootGroup.'**'.find { it.id == pgId } + def foundConnections = foundProcessGroup.'**'.findAll { it.name() == 'connection' }.collect { it.id.text() } + connectionMocksById.entrySet().collect { + if (foundConnections.contains(it.key)) { + it.value + } + } + } + positionableMocksById.put(pgId, processGroup) + return processGroup + } + + _ * controller.createProcessor(_, _, _) >> { String type, String id, boolean firstTimeAdded -> + def processor = Mock(ProcessorNode) + _ * processor.getPosition() >> { positionablePositionsById.get(id) } + _ * processor.setPosition(_) >> { Position pos -> + positionablePositionsById.put id, pos + } + _ * processor./(add|set).*/(*_) + _ * processor.getIdentifier() >> id + _ * processor.getRelationship(_) >> { String n -> new Relationship.Builder().name(n).build() } + positionableMocksById.put(id, processor) + return processor + } + _ * controller.createFunnel(_) >> { String id -> + def funnel = Mock(Funnel) + _ * funnel.getPosition() >> { positionablePositionsById.get(id) } + _ * funnel.setPosition(_) >> { Position pos -> + positionablePositionsById.put id, pos + } + _ * funnel./(add|set).*/(*_) + positionableMocksById.put id, funnel + return funnel + } + _ * controller.createLabel(_, _) >> { String id, String text -> + def l = Mock(Label) + _ * l.getPosition() >> { positionablePositionsById.get(id) } + _ * l.setPosition(_) >> { Position pos -> + positionablePositionsById.put id, pos + } + _ * l./(add|set).*/(*_) + positionableMocksById.put(id, l) + return l + } + _ * controller./create.*Port/(_, _) >> { String id, String text -> + def port = Mock(Port) + _ * port.getPosition() >> { positionablePositionsById.get(id) } + _ * port.setPosition(_) >> { Position pos -> + positionablePositionsById.put id, pos + } + _ * port./(add|set).*/(*_) + positionableMocksById.put(id, port) + return port + } + _ * controller.createRemoteProcessGroup(_, _) >> { String id, String uri -> + def rpg = Mock(RemoteProcessGroup) + _ * rpg.getPosition() >> { positionablePositionsById.get(id) } + _ * rpg.setPosition(_) >> { Position pos -> + positionablePositionsById.put id, pos + } + _ * rpg./(add|set).*/(*_) + _ * rpg.getOutputPort(_) >> { String rpgId -> positionableMocksById.get(rpgId) } + _ * rpg.getIdentifier() >> id + positionableMocksById.put(id, rpg) + return rpg + } + _ * controller.createConnection(_, _, _, _, _) >> { String id, String name, Connectable source, Connectable destination, Collection<String> relationshipNames -> + def connection = Mock(Connection) + _ * connection.getIdentifier() >> id + _ * connection.getBendPoints() >> { + def bendpoints = bendPointPositionsByConnectionId.get(id) + return bendpoints + } + _ * connection.setBendPoints(_) >> { + // There seems to be a bug in Spock method matching where a list of arguments to a method + // is being coerced into an Arrays$ArrayList with the actual list of bend points as an + // ArrayList in the 0th element. + // Need to keep an eye on this... + bendPointPositionsByConnectionId.put id, it[0] + } + _ * connection./set.*/(*_) + _ * connection.flowFileQueue >> flowFileQueue + connectionMocksById.put(id, connection) + return connection + } + _ * controller.startProcessor(*_) + _ * controller.startConnectable(_) + _ * controller.enableControllerServices(_) + _ * snippetManager.export() >> { + [] as byte[] + } + _ * snippetManager.clear() + 1 * proposedFlow.flow >> testFlowBytes + _ * proposedFlow.snippets >> { + [] as byte[] + } + _ * flowFileQueue./set.*/(*_) + _ * _.hashCode() >> 1 + 0 * _ // no other mock calls allowed + + then: "verify that the flow was scaled properly" + originalPositionablePositionsById.entrySet().forEach { entry -> + assert positionablePositionsById.containsKey(entry.key) + def originalPosition = entry.value + def position = positionablePositionsById.get(entry.key) + compareOriginalPointToScaledPoint(originalPosition, position, isSyncedPositionGreater) + } + originalBendPointsByConnectionId.entrySet().forEach { entry -> + assert bendPointPositionsByConnectionId.containsKey(entry.key) + def originalBendPoints = entry.value + def sortedBendPoints = bendPointPositionsByConnectionId.get(entry.key).sort { it.x } + def sortedOriginalBendPoints = originalBendPoints.sort { it.x } + assert sortedOriginalBendPoints.size() == sortedBendPoints.size() + [sortedOriginalBendPoints, sortedBendPoints].transpose().forEach { Position originalPosition, Position position -> + compareOriginalPointToScaledPoint(originalPosition, position, isSyncedPositionGreater) + } + } + + where: "the each flowfile and flow encoding version is run through the StandardFlowSynchronizer" + filename | flowEncodingVersion | isSyncedPositionGreater + '/conf/scale-positions-flow-0.7.0.xml' | null | true + '/conf/scale-positions-flow-0.7.0.xml' | '0.7' | true + '/conf/scale-positions-flow-0.7.0.xml' | '1.0' | false + '/conf/scale-positions-flow-0.7.0.xml' | '99.0' | false + } + + private void compareOriginalPointToScaledPoint(Position originalPosition, Position position, boolean isSyncedPositionGreater) { + if (originalPosition.x == 0) { + assert position.x == 0 + } + if (originalPosition.y == 0) { + assert position.y == 0 + } + if (originalPosition.x > 0) { + assert isSyncedPositionGreater == position.x > originalPosition.x + } + if (originalPosition.y > 0) { + assert isSyncedPositionGreater == position.y > originalPosition.y + } + if (originalPosition.x < 0) { + assert isSyncedPositionGreater == position.x < originalPosition.x + } + if (originalPosition.y < 0) { + assert isSyncedPositionGreater == position.y < originalPosition.y + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/433db235/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index eebcfa5..c10679f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -17,12 +17,6 @@ package org.apache.nifi.controller.service.mock; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.connectable.Connectable; @@ -30,6 +24,7 @@ import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Positionable; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; @@ -39,6 +34,12 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class MockProcessGroup implements ProcessGroup { private Map<String, ControllerServiceNode> serviceMap = new HashMap<>(); @@ -268,6 +269,11 @@ public class MockProcessGroup implements ProcessGroup { } @Override + public Set<Positionable> findAllPositionables() { + return null; + } + + @Override public Connectable getConnectable(String id) { return null; }
