http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 0000000,8cff5dd..856ccc1 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@@ -1,0 -1,2019 +1,2020 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.groups; + + import static java.util.Objects.requireNonNull; + + import java.util.ArrayList; + import java.util.HashMap; + import java.util.HashSet; + import java.util.LinkedHashSet; + import java.util.List; + import java.util.Map; + import java.util.Objects; + import java.util.Set; + import java.util.concurrent.atomic.AtomicReference; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantReadWriteLock; + ++import org.apache.nifi.annotation.lifecycle.OnRemoved; ++import org.apache.nifi.annotation.lifecycle.OnShutdown; + import org.apache.nifi.connectable.Connectable; + import org.apache.nifi.connectable.ConnectableType; + import org.apache.nifi.connectable.Connection; + import org.apache.nifi.connectable.Funnel; + import org.apache.nifi.connectable.LocalPort; + import org.apache.nifi.connectable.Port; + import org.apache.nifi.connectable.Position; + import org.apache.nifi.controller.ProcessScheduler; + import org.apache.nifi.controller.ProcessorNode; + import org.apache.nifi.controller.ScheduledState; + import org.apache.nifi.controller.Snippet; + import org.apache.nifi.controller.exception.ProcessorLifeCycleException; + import org.apache.nifi.controller.label.Label; + import org.apache.nifi.controller.service.ControllerServiceProvider; + import org.apache.nifi.logging.LogRepositoryFactory; + import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.processor.SimpleProcessLogger; + import org.apache.nifi.processor.StandardProcessContext; -import org.apache.nifi.processor.annotation.OnRemoved; -import org.apache.nifi.processor.annotation.OnShutdown; + import org.apache.nifi.remote.RemoteGroupPort; + import org.apache.nifi.remote.RootGroupPort; + import org.apache.nifi.util.NiFiProperties; + import org.apache.nifi.util.ReflectionUtils; + import org.apache.commons.lang3.StringUtils; + import org.apache.commons.lang3.builder.HashCodeBuilder; + import org.apache.commons.lang3.builder.ToStringBuilder; + import org.apache.commons.lang3.builder.ToStringStyle; + import org.apache.nifi.encrypt.StringEncryptor; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public final class StandardProcessGroup implements ProcessGroup { + + private final String id; + private final AtomicReference<ProcessGroup> parent; + private final AtomicReference<String> name; + private final AtomicReference<Position> position; + private final AtomicReference<String> comments; + + private final ProcessScheduler scheduler; + private final ControllerServiceProvider controllerServiceProvider; + + private final Map<String, Port> inputPorts = new HashMap<>(); + private final Map<String, Port> outputPorts = new HashMap<>(); + private final Map<String, Connection> connections = new HashMap<>(); + private final Map<String, ProcessGroup> processGroups = new HashMap<>(); + private final Map<String, Label> labels = new HashMap<>(); + private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>(); + private final Map<String, ProcessorNode> processors = new HashMap<>(); + private final Map<String, Funnel> funnels = new HashMap<>(); + private final StringEncryptor encryptor; + + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class); + + public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, final NiFiProperties nifiProps, final StringEncryptor encryptor) { + this.id = id; + this.controllerServiceProvider = serviceProvider; + this.parent = new AtomicReference<>(); + this.scheduler = scheduler; + this.comments = new AtomicReference<>(""); + this.encryptor = encryptor; + name = new AtomicReference<>(); + position = new AtomicReference<>(new Position(0D, 0D)); + } + + @Override + public ProcessGroup getParent() { + return parent.get(); + } + + @Override + public void setParent(final ProcessGroup newParent) { + parent.set(newParent); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public String getName() { + return name.get(); + } + + @Override + public void setName(final String name) { + if (StringUtils.isBlank(name)) { + throw new IllegalArgumentException("The name cannot be blank."); + } + + this.name.set(name); + } + + @Override + public void setPosition(final Position position) { + this.position.set(position); + } + + @Override + public Position getPosition() { + return position.get(); + } + + @Override + public String getComments() { + return this.comments.get(); + } + + @Override + public void setComments(final String comments) { + this.comments.set(comments); + } + + @Override + public ProcessGroupCounts getCounts() { + int inputPortCount = 0; + int outputPortCount = 0; + + int running = 0; + int stopped = 0; + int invalid = 0; + int disabled = 0; + int activeRemotePorts = 0; + int inactiveRemotePorts = 0; + + readLock.lock(); + try { + for (final ProcessorNode procNode : processors.values()) { + if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) { + disabled++; + } else if (procNode.isRunning()) { + running++; + } else if (!procNode.isValid()) { + invalid++; + } else { + stopped++; + } + } + + inputPortCount = inputPorts.size(); + for (final Port port : inputPorts.values()) { + if (ScheduledState.DISABLED.equals(port.getScheduledState())) { + disabled++; + } else if (port.isRunning()) { + running++; + } else if (!port.isValid()) { + invalid++; + } else { + stopped++; + } + } + + outputPortCount = outputPorts.size(); + for (final Port port : outputPorts.values()) { + if (ScheduledState.DISABLED.equals(port.getScheduledState())) { + disabled++; + } else if (port.isRunning()) { + running++; + } else if (!port.isValid()) { + invalid++; + } else { + stopped++; + } + } + + for (final ProcessGroup childGroup : processGroups.values()) { + final ProcessGroupCounts childCounts = childGroup.getCounts(); + running += childCounts.getRunningCount(); + stopped += childCounts.getStoppedCount(); + invalid += childCounts.getInvalidCount(); + disabled += childCounts.getDisabledCount(); + } + + for (final RemoteProcessGroup remoteGroup : findAllRemoteProcessGroups()) { + // Count only input ports that have incoming connections + for (final Port port : remoteGroup.getInputPorts()) { + if (port.hasIncomingConnection()) { + if (port.isRunning()) { + activeRemotePorts++; + } else { + inactiveRemotePorts++; + } + } + } + + // Count only output ports that have outgoing connections + for (final Port port : remoteGroup.getOutputPorts()) { + if (!port.getConnections().isEmpty()) { + if (port.isRunning()) { + activeRemotePorts++; + } else { + inactiveRemotePorts++; + } + } + } + + final String authIssue = remoteGroup.getAuthorizationIssue(); + if (authIssue != null) { + invalid++; + } + } + } finally { + readLock.unlock(); + } + + return new ProcessGroupCounts(inputPortCount, outputPortCount, running, stopped, + invalid, disabled, activeRemotePorts, inactiveRemotePorts); + } + + @Override + public boolean isRootGroup() { + return parent.get() == null; + } + + @Override + public void startProcessing() { + readLock.lock(); + try { + for (final ProcessorNode node : processors.values()) { + try { + if (!node.isRunning() && node.getScheduledState() != ScheduledState.DISABLED) { + startProcessor(node); + } + } catch (final Throwable t) { + LOG.error("Unable to start {} due to {}", new Object[]{node, t}); + } + } + + for (final Port inputPort : getInputPorts()) { + if (inputPort.getScheduledState() != ScheduledState.DISABLED) { + startInputPort(inputPort); + } + } + + for (final Port outputPort : getOutputPorts()) { + if (outputPort.getScheduledState() != ScheduledState.DISABLED) { + startOutputPort(outputPort); + } + } + + for (final Funnel funnel : getFunnels()) { + if (funnel.getScheduledState() != ScheduledState.DISABLED) { + startFunnel(funnel); + } + } + + // Recursively start child groups. + for (final ProcessGroup group : processGroups.values()) { + group.startProcessing(); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void stopProcessing() { + readLock.lock(); + try { + for (final ProcessorNode node : processors.values()) { + try { + if (node.isRunning()) { + stopProcessor(node); + } + } catch (final Throwable t) { + LOG.error("Unable to stop {} due to {}", new Object[]{node, t}); + } + } + + for (final Port inputPort : getInputPorts()) { + if (inputPort.getScheduledState() == ScheduledState.RUNNING) { + stopInputPort(inputPort); + } + } + + for (final Port outputPort : getOutputPorts()) { + if (outputPort.getScheduledState() == ScheduledState.RUNNING) { + stopOutputPort(outputPort); + } + } + + // Recursively stop child groups. + for (final ProcessGroup group : processGroups.values()) { + group.stopProcessing(); + } + } finally { + readLock.unlock(); + } + } + ++ @SuppressWarnings("deprecation") + private void shutdown(final ProcessGroup procGroup) { + for (final ProcessorNode node : procGroup.getProcessors()) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor()); ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, node.getProcessor()); + } + } + + for ( final RemoteProcessGroup rpg : procGroup.getRemoteProcessGroups() ) { + rpg.shutdown(); + } + + // Recursively shutdown child groups. + for (final ProcessGroup group : procGroup.getProcessGroups()) { + shutdown(group); + } + } + + @Override + public void shutdown() { + readLock.lock(); + try { + shutdown(this); + } finally { + readLock.unlock(); + } + } + + @Override + public void addInputPort(final Port port) { + if (isRootGroup()) { + if (!(port instanceof RootGroupPort)) { + throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to the Root Group"); + } + } else if (!(port instanceof LocalPort)) { + throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to a non-root group"); + } + + writeLock.lock(); + try { + if (inputPorts.containsKey(requireNonNull(port).getIdentifier())) { + throw new IllegalStateException("Input Port with ID " + port.getIdentifier() + " already exists"); + } + + if (getInputPortByName(port.getName()) != null) { + throw new IllegalStateException("Input Port with name " + port.getName() + " already exists"); + } + + port.setProcessGroup(this); + inputPorts.put(requireNonNull(port).getIdentifier(), port); + } finally { + writeLock.unlock(); + } + } + + @Override + public void removeInputPort(final Port port) { + writeLock.lock(); + try { + final Port toRemove = inputPorts.get(requireNonNull(port).getIdentifier()); + if (toRemove == null) { + throw new IllegalStateException(port + " is not an Input Port of this Process Group"); + } + + port.verifyCanDelete(); + for (final Connection conn : port.getConnections()) { + conn.verifyCanDelete(); + } + + if (port.isRunning()) { + stopInputPort(port); + } + + // must copy to avoid a concurrent modification + final Set<Connection> copy = new HashSet<>(port.getConnections()); + for (final Connection conn : copy) { + removeConnection(conn); + } + + final Port removed = inputPorts.remove(port.getIdentifier()); + if (removed == null) { + throw new IllegalStateException(port + " is not an Input Port of this Process Group"); + } + + LOG.info("Input Port {} removed from flow", port); + } finally { + writeLock.unlock(); + } + } + + @Override + public Port getInputPort(final String id) { + readLock.lock(); + try { + return inputPorts.get(Objects.requireNonNull(id)); + } finally { + readLock.unlock(); + } + } + + @Override + public Set<Port> getInputPorts() { + readLock.lock(); + try { + return new HashSet<>(inputPorts.values()); + } finally { + readLock.unlock(); + } + } + + @Override + public void addOutputPort(final Port port) { + if (isRootGroup()) { + if (!(port instanceof RootGroupPort)) { + throw new IllegalArgumentException("Cannot add Output Port of type " + port.getClass().getName() + " to the Root Group"); + } + } else if (!(port instanceof LocalPort)) { + throw new IllegalArgumentException("Cannot add Output Port of type " + port.getClass().getName() + " to a non-root group"); + } + + writeLock.lock(); + try { + if (outputPorts.containsKey(requireNonNull(port).getIdentifier())) { + throw new IllegalStateException("Output Port with ID " + port.getIdentifier() + " already exists"); + } + + if (getOutputPortByName(port.getName()) != null) { + throw new IllegalStateException("Output Port with Name " + port.getName() + " already exists"); + } + + port.setProcessGroup(this); + outputPorts.put(port.getIdentifier(), port); + } finally { + writeLock.unlock(); + } + } + + @Override + public void removeOutputPort(final Port port) { + writeLock.lock(); + try { + final Port toRemove = outputPorts.get(requireNonNull(port).getIdentifier()); + toRemove.verifyCanDelete(); + + if (port.isRunning()) { + stopOutputPort(port); + } + + if (!toRemove.getConnections().isEmpty()) { + throw new IllegalStateException(port + " cannot be removed until its connections are removed"); + } + + final Port removed = outputPorts.remove(port.getIdentifier()); + if (removed == null) { + throw new IllegalStateException(port + " is not an Output Port of this Process Group"); + } + + LOG.info("Output Port {} removed from flow", port); + } finally { + writeLock.unlock(); + } + } + + @Override + public Port getOutputPort(final String id) { + readLock.lock(); + try { + return outputPorts.get(Objects.requireNonNull(id)); + } finally { + readLock.unlock(); + } + } + + @Override + public Set<Port> getOutputPorts() { + readLock.lock(); + try { + return new HashSet<>(outputPorts.values()); + } finally { + readLock.unlock(); + } + } + + @Override + public void addProcessGroup(final ProcessGroup group) { + if (StringUtils.isEmpty(group.getName())) { + throw new IllegalArgumentException("Process Group's name must be specified"); + } + + writeLock.lock(); + try { + group.setParent(this); + processGroups.put(Objects.requireNonNull(group).getIdentifier(), group); + } finally { + writeLock.unlock(); + } + } + + @Override + public ProcessGroup getProcessGroup(final String id) { + readLock.lock(); + try { + return processGroups.get(id); + } finally { + readLock.unlock(); + } + } + + @Override + public Set<ProcessGroup> getProcessGroups() { + readLock.lock(); + try { + return new HashSet<>(processGroups.values()); + } finally { + readLock.unlock(); + } + } + + @Override + public void removeProcessGroup(final ProcessGroup group) { + if (!requireNonNull(group).isEmpty()) { + throw new IllegalStateException("Cannot remove " + group + " because it is not empty"); + } + + writeLock.lock(); + try { + final ProcessGroup toRemove = processGroups.get(group.getIdentifier()); + if (toRemove == null) { + throw new IllegalStateException(group + " is not a member of this Process Group"); + } + verifyCanRemove(toRemove); + + processGroups.remove(group.getIdentifier()); + + LOG.info("{} removed from flow", group); + } finally { + writeLock.unlock(); + } + } + + private void verifyCanRemove(final ProcessGroup childGroup) { + if (!childGroup.isEmpty()) { + throw new IllegalStateException("Cannot remove ProcessGroup because it is not empty"); + } + } + + @Override + public void addRemoteProcessGroup(final RemoteProcessGroup remoteGroup) { + writeLock.lock(); + try { + if (remoteGroups.containsKey(requireNonNull(remoteGroup).getIdentifier())) { + throw new IllegalStateException("RemoteProcessGroup already exists with ID " + remoteGroup.getIdentifier()); + } + + remoteGroup.setProcessGroup(this); + remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup); + } finally { + writeLock.unlock(); + } + } + + @Override + public Set<RemoteProcessGroup> getRemoteProcessGroups() { + readLock.lock(); + try { + return new HashSet<>(remoteGroups.values()); + } finally { + readLock.unlock(); + } + } + + @Override + public void removeRemoteProcessGroup(final RemoteProcessGroup remoteProcessGroup) { + final String remoteGroupId = requireNonNull(remoteProcessGroup).getIdentifier(); + + writeLock.lock(); + try { + final RemoteProcessGroup remoteGroup = remoteGroups.get(remoteGroupId); + if (remoteGroup == null) { + throw new IllegalStateException(remoteProcessGroup + " is not a member of this Process Group"); + } + + remoteGroup.verifyCanDelete(); + for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) { + for (final Connection connection : port.getConnections()) { + connection.verifyCanDelete(); + } + } + + for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) { + // must copy to avoid a concurrent modification + final Set<Connection> copy = new HashSet<>(port.getConnections()); + for (final Connection connection : copy) { + removeConnection(connection); + } + } + + try { + remoteGroup.onRemove(); + } catch (final Exception e) { + LOG.warn("Failed to clean up resources for {} due to {}", remoteGroup, e); + } + + remoteGroups.remove(remoteGroupId); + LOG.info("{} removed from flow", remoteProcessGroup); + } finally { + writeLock.unlock(); + } + } + + @Override + public void addProcessor(final ProcessorNode processor) { + writeLock.lock(); + try { + final String processorId = requireNonNull(processor).getIdentifier(); + final ProcessorNode existingProcessor = processors.get(processorId); + if (existingProcessor != null) { + throw new IllegalStateException("A processor is already registered to this ProcessGroup with ID " + processorId); + } + + processor.setProcessGroup(this); + processors.put(processorId, processor); + } finally { + writeLock.unlock(); + } + } + ++ @SuppressWarnings("deprecation") + @Override + public void removeProcessor(final ProcessorNode processor) { + final String id = requireNonNull(processor).getIdentifier(); + writeLock.lock(); + try { + if (!processors.containsKey(id)) { + throw new IllegalStateException(processor + " is not a member of this Process Group"); + } + + processor.verifyCanDelete(); + for (final Connection conn : processor.getConnections()) { + conn.verifyCanDelete(); + } + + try (final NarCloseable x = NarCloseable.withNarLoader()) { + final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor); - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, processor.getProcessor(), processContext); + } catch (final Exception e) { + throw new ProcessorLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e); + } + + processors.remove(id); + LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers(); + + // must copy to avoid a concurrent modification + final Set<Connection> copy = new HashSet<>(processor.getConnections()); + for (final Connection conn : copy) { + removeConnection(conn); + } + + LOG.info("{} removed from flow", processor); + } finally { + writeLock.unlock(); + } + } + + @Override + public Set<ProcessorNode> getProcessors() { + readLock.lock(); + try { + return new LinkedHashSet<>(processors.values()); + } finally { + readLock.unlock(); + } + } + + @Override + public ProcessorNode getProcessor(final String id) { + readLock.lock(); + try { + return processors.get(Objects.requireNonNull(id)); + } finally { + readLock.unlock(); + } + } + + private boolean isInputPort(final Connectable connectable) { + if (connectable.getConnectableType() != ConnectableType.INPUT_PORT) { + return false; + } + return findInputPort(connectable.getIdentifier()) != null; + } + + private boolean isOutputPort(final Connectable connectable) { + if (connectable.getConnectableType() != ConnectableType.OUTPUT_PORT) { + return false; + } + return findOutputPort(connectable.getIdentifier()) != null; + } + + @Override + public void inheritConnection(final Connection connection) { + writeLock.lock(); + try { + connections.put(connection.getIdentifier(), connection); + } finally { + writeLock.unlock(); + } + } + + @Override + public void addConnection(final Connection connection) { + writeLock.lock(); + try { + final String id = requireNonNull(connection).getIdentifier(); + final Connection existingConnection = connections.get(id); + if (existingConnection != null) { + throw new IllegalStateException("Connection already exists with ID " + id); + } + + final Connectable source = connection.getSource(); + final Connectable destination = connection.getDestination(); + final ProcessGroup sourceGroup = source.getProcessGroup(); + final ProcessGroup destinationGroup = destination.getProcessGroup(); + + // validate the connection is validate wrt to the source & destination groups + if (isInputPort(source)) { // if source is an input port, its destination must be in the same group unless it's an input port + if (isInputPort(destination)) { // if destination is input port, it must be in a child group. + if (!processGroups.containsKey(destinationGroup.getIdentifier())) { + throw new IllegalStateException("Cannot add Connection to Process Group because destination is an Input Port that does not belong to a child Process Group"); + } + } else if (sourceGroup != this || destinationGroup != this) { + throw new IllegalStateException("Cannot add Connection to Process Group because source and destination are not both in this Process Group"); + } + } else if (isOutputPort(source)) { // if source is an output port, its group must be a child of this group, and its destination must be in this group (processor/output port) or a child group (input port) + if (!processGroups.containsKey(sourceGroup.getIdentifier())) { + throw new IllegalStateException("Cannot add Connection to Process Group because source is an Output Port that does not belong to a child Process Group"); + } + + if (isInputPort(destination)) { + if (!processGroups.containsKey(destinationGroup.getIdentifier())) { + throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input Port that does not belong to a child Process Group"); + } + } else { + if (destinationGroup != this) { + throw new IllegalStateException("Cannot add Connection to Process Group because its destination does not belong to this Process Group"); + } + } + } else { // source is not a port + if (sourceGroup != this) { + throw new IllegalStateException("Cannot add Connection to Process Group because the source does not belong to this Process Group"); + } + + if (isOutputPort(destination)) { + if (destinationGroup != this) { + throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Output Port but does not belong to this Process Group"); + } + } else if (isInputPort(destination)) { + if (!processGroups.containsKey(destinationGroup.getIdentifier())) { + throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input Port but the Input Port does not belong to a child Process Group"); + } + } else if (destinationGroup != this) { + throw new IllegalStateException("Cannot add Connection between " + source + " and " + destination + " because they are in different Process Groups and neither is an Input Port or Output Port"); + } + } + + connection.setProcessGroup(this); + source.addConnection(connection); + if (source != destination) { // don't call addConnection twice if it's a self-looping connection. + destination.addConnection(connection); + } + connections.put(connection.getIdentifier(), connection); + } finally { + writeLock.unlock(); + } + } + + @Override + public Connectable getConnectable(final String id) { + readLock.lock(); + try { + final ProcessorNode node = processors.get(id); + if (node != null) { + return node; + } + + final Port inputPort = inputPorts.get(id); + if (inputPort != null) { + return inputPort; + } + + final Port outputPort = outputPorts.get(id); + if (outputPort != null) { + return outputPort; + } + + final Funnel funnel = funnels.get(id); + if (funnel != null) { + return funnel; + } + + return null; + } finally { + readLock.unlock(); + } + } + + @Override + public void removeConnection(final Connection connectionToRemove) { + writeLock.lock(); + try { + // verify that Connection belongs to this group + final Connection connection = connections.get(requireNonNull(connectionToRemove).getIdentifier()); + if (connection == null) { + throw new IllegalStateException(connectionToRemove + " is not a member of this Process Group"); + } + + connectionToRemove.verifyCanDelete(); + + final Connectable source = connectionToRemove.getSource(); + final Connectable dest = connectionToRemove.getDestination(); + + // update the source & destination + source.removeConnection(connection); + if (source != dest) { + dest.removeConnection(connection); + } + + // remove the connection from our map + connections.remove(connection.getIdentifier()); + LOG.info("{} removed from flow", connection); + } finally { + writeLock.unlock(); + } + } + + @Override + public Set<Connection> getConnections() { + readLock.lock(); + try { + return new HashSet<>(connections.values()); + } finally { + readLock.unlock(); + } + } + + @Override + public Connection getConnection(final String id) { + readLock.lock(); + try { + return connections.get(Objects.requireNonNull(id)); + } finally { + readLock.unlock(); + } + } + + @Override + public List<Connection> findAllConnections() { + return findAllConnections(this); + } + + private List<Connection> findAllConnections(final ProcessGroup group) { + final List<Connection> connections = new ArrayList<>(group.getConnections()); + for (final ProcessGroup childGroup : group.getProcessGroups()) { + connections.addAll(findAllConnections(childGroup)); + } + return connections; + } + + @Override + public void addLabel(final Label label) { + writeLock.lock(); + try { + final Label existing = labels.get(requireNonNull(label).getIdentifier()); + if (existing != null) { + throw new IllegalStateException("A label already exists in this ProcessGroup with ID " + label.getIdentifier()); + } + + label.setProcessGroup(this); + labels.put(label.getIdentifier(), label); + } finally { + writeLock.unlock(); + } + } + + @Override + public void removeLabel(final Label label) { + writeLock.lock(); + try { + final Label removed = labels.remove(requireNonNull(label).getIdentifier()); + if (removed == null) { + throw new IllegalStateException(label + " is not a member of this Process Group."); + } + + LOG.info("Label with ID {} removed from flow", label.getIdentifier()); + } finally { + writeLock.unlock(); + } + } + + @Override + public Set<Label> getLabels() { + readLock.lock(); + try { + return new HashSet<>(labels.values()); + } finally { + readLock.unlock(); + } + } + + @Override + public Label getLabel(final String id) { + readLock.lock(); + try { + return labels.get(id); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean isEmpty() { + readLock.lock(); + try { + return inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty() + && processGroups.isEmpty() && labels.isEmpty() && processors.isEmpty() && remoteGroups.isEmpty(); + } finally { + readLock.unlock(); + } + } + + @Override + public RemoteProcessGroup getRemoteProcessGroup(final String id) { + readLock.lock(); + try { + return remoteGroups.get(Objects.requireNonNull(id)); + } finally { + readLock.unlock(); + } + } + + @Override + public void startProcessor(final ProcessorNode processor) { + readLock.lock(); + try { + if (getProcessor(processor.getIdentifier()) == null) { + throw new IllegalStateException("Processor is not a member of this Process Group"); + } + + final ScheduledState state = processor.getScheduledState(); + if (state == ScheduledState.DISABLED) { + throw new IllegalStateException("Processor is disabled"); + } else if (state == ScheduledState.RUNNING) { + return; + } + + scheduler.startProcessor(processor); + } finally { + readLock.unlock(); + } + } + + @Override + public void startInputPort(final Port port) { + readLock.lock(); + try { + if (getInputPort(port.getIdentifier()) == null) { + throw new IllegalStateException(port + " is not a member of this Process Group"); + } + + final ScheduledState state = port.getScheduledState(); + if (state == ScheduledState.DISABLED) { + throw new IllegalStateException("InputPort " + port + " is disabled"); + } else if (state == ScheduledState.RUNNING) { + return; + } + + scheduler.startPort(port); + } finally { + readLock.unlock(); + } + } + + @Override + public void startOutputPort(final Port port) { + readLock.lock(); + try { + if (getOutputPort(port.getIdentifier()) == null) { + throw new IllegalStateException("Port is not a member of this Process Group"); + } + + final ScheduledState state = port.getScheduledState(); + if (state == ScheduledState.DISABLED) { + throw new IllegalStateException("OutputPort is disabled"); + } else if (state == ScheduledState.RUNNING) { + return; + } + + scheduler.startPort(port); + } finally { + readLock.unlock(); + } + } + + @Override + public void startFunnel(final Funnel funnel) { + readLock.lock(); + try { + if (getFunnel(funnel.getIdentifier()) == null) { + throw new IllegalStateException("Funnel is not a member of this Process Group"); + } + + final ScheduledState state = funnel.getScheduledState(); + if (state == ScheduledState.DISABLED) { + throw new IllegalStateException("Funnel is disabled"); + } else if (state == ScheduledState.RUNNING) { + return; + } + scheduler.startFunnel(funnel); + } finally { + readLock.unlock(); + } + } + + @Override + public void stopProcessor(final ProcessorNode processor) { + readLock.lock(); + try { + if (!processors.containsKey(processor.getIdentifier())) { + throw new IllegalStateException("No processor with ID " + processor.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = processor.getScheduledState(); + if (state == ScheduledState.DISABLED) { + throw new IllegalStateException("Processor is disabled"); + } else if (state == ScheduledState.STOPPED) { + return; + } + + scheduler.stopProcessor(processor); + } finally { + readLock.unlock(); + } + } + + @Override + public void stopInputPort(final Port port) { + readLock.lock(); + try { + if (!inputPorts.containsKey(port.getIdentifier())) { + throw new IllegalStateException("No Input Port with ID " + port.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = port.getScheduledState(); + if (state == ScheduledState.DISABLED) { + throw new IllegalStateException("InputPort is disabled"); + } else if (state == ScheduledState.STOPPED) { + return; + } + + scheduler.stopPort(port); + } finally { + readLock.unlock(); + } + } + + @Override + public void stopOutputPort(final Port port) { + readLock.lock(); + try { + if (!outputPorts.containsKey(port.getIdentifier())) { + throw new IllegalStateException("No Output Port with ID " + port.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = port.getScheduledState(); + if (state == ScheduledState.DISABLED) { + throw new IllegalStateException("OutputPort is disabled"); + } else if (state == ScheduledState.STOPPED) { + return; + } + + scheduler.stopPort(port); + } finally { + readLock.unlock(); + } + } + + @Override + public void stopFunnel(final Funnel funnel) { + readLock.lock(); + try { + if (!funnels.containsKey(funnel.getIdentifier())) { + throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = funnel.getScheduledState(); + if (state == ScheduledState.DISABLED) { + throw new IllegalStateException("Funnel is disabled"); + } else if (state == ScheduledState.STOPPED) { + return; + } + + scheduler.stopFunnel(funnel); + } finally { + readLock.unlock(); + } + } + + @Override + public void enableFunnel(final Funnel funnel) { + readLock.lock(); + try { + if (!funnels.containsKey(funnel.getIdentifier())) { + throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = funnel.getScheduledState(); + if (state == ScheduledState.STOPPED) { + return; + } else if (state == ScheduledState.RUNNING) { + throw new IllegalStateException("Funnel is currently running"); + } + + scheduler.enableFunnel(funnel); + } finally { + readLock.unlock(); + } + } + + @Override + public void enableInputPort(final Port port) { + readLock.lock(); + try { + if (!inputPorts.containsKey(port.getIdentifier())) { + throw new IllegalStateException("No Input Port with ID " + port.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = port.getScheduledState(); + if (state == ScheduledState.STOPPED) { + return; + } else if (state == ScheduledState.RUNNING) { + throw new IllegalStateException("InputPort is currently running"); + } + + scheduler.enablePort(port); + } finally { + readLock.unlock(); + } + } + + @Override + public void enableOutputPort(final Port port) { + readLock.lock(); + try { + if (!outputPorts.containsKey(port.getIdentifier())) { + throw new IllegalStateException("No Output Port with ID " + port.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = port.getScheduledState(); + if (state == ScheduledState.STOPPED) { + return; + } else if (state == ScheduledState.RUNNING) { + throw new IllegalStateException("OutputPort is currently running"); + } + + scheduler.enablePort(port); + } finally { + readLock.unlock(); + } + } + + @Override + public void enableProcessor(final ProcessorNode processor) { + readLock.lock(); + try { + if (!processors.containsKey(processor.getIdentifier())) { + throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = processor.getScheduledState(); + if (state == ScheduledState.STOPPED) { + return; + } else if (state == ScheduledState.RUNNING) { + throw new IllegalStateException("Processor is currently running"); + } + + scheduler.enableProcessor(processor); + } finally { + readLock.unlock(); + } + } + + @Override + public void disableFunnel(final Funnel funnel) { + readLock.lock(); + try { + if (!funnels.containsKey(funnel.getIdentifier())) { + throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = funnel.getScheduledState(); + if (state == ScheduledState.DISABLED) { + return; + } else if (state == ScheduledState.RUNNING) { + throw new IllegalStateException("Funnel is currently running"); + } + + scheduler.disableFunnel(funnel); + } finally { + readLock.unlock(); + } + } + + @Override + public void disableInputPort(final Port port) { + readLock.lock(); + try { + if (!inputPorts.containsKey(port.getIdentifier())) { + throw new IllegalStateException("No InputPort with ID " + port.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = port.getScheduledState(); + if (state == ScheduledState.DISABLED) { + return; + } else if (state == ScheduledState.RUNNING) { + throw new IllegalStateException("InputPort is currently running"); + } + + scheduler.disablePort(port); + } finally { + readLock.unlock(); + } + } + + @Override + public void disableOutputPort(final Port port) { + readLock.lock(); + try { + if (!outputPorts.containsKey(port.getIdentifier())) { + throw new IllegalStateException("No OutputPort with ID " + port.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = port.getScheduledState(); + if (state == ScheduledState.DISABLED) { + return; + } else if (state == ScheduledState.RUNNING) { + throw new IllegalStateException("OutputPort is currently running"); + } + + scheduler.disablePort(port); + } finally { + readLock.unlock(); + } + } + + @Override + public void disableProcessor(final ProcessorNode processor) { + readLock.lock(); + try { + if (!processors.containsKey(processor.getIdentifier())) { + throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group"); + } + + final ScheduledState state = processor.getScheduledState(); + if (state == ScheduledState.DISABLED) { + return; + } else if (state == ScheduledState.RUNNING) { + throw new IllegalStateException("Processor is currently running"); + } + + scheduler.disableProcessor(processor); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean equals(final Object obj) { + if (obj instanceof StandardProcessGroup) { + final StandardProcessGroup other = (StandardProcessGroup) obj; + return (getIdentifier().equals(other.getIdentifier())); + } else { + return false; + } + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(getIdentifier()).toHashCode(); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("name", name).toString(); + } + + @Override + public ProcessGroup findProcessGroup(final String id) { + return findProcessGroup(requireNonNull(id), this); + } + + private ProcessGroup findProcessGroup(final String id, final ProcessGroup start) { + if (id.equals(start.getIdentifier())) { + return start; + } + + for (final ProcessGroup group : start.getProcessGroups()) { + final ProcessGroup matching = findProcessGroup(id, group); + if (matching != null) { + return matching; + } + } + + return null; + } + + @Override + public List<RemoteProcessGroup> findAllRemoteProcessGroups() { + return findAllRemoteProcessGroups(this); + } + + private List<RemoteProcessGroup> findAllRemoteProcessGroups(final ProcessGroup start) { + final List<RemoteProcessGroup> remoteGroups = new ArrayList<>(start.getRemoteProcessGroups()); + for (final ProcessGroup childGroup : start.getProcessGroups()) { + remoteGroups.addAll(findAllRemoteProcessGroups(childGroup)); + } + return remoteGroups; + } + + @Override + public RemoteProcessGroup findRemoteProcessGroup(final String id) { + return findRemoteProcessGroup(requireNonNull(id), this); + } + + private RemoteProcessGroup findRemoteProcessGroup(final String id, final ProcessGroup start) { + RemoteProcessGroup remoteGroup = start.getRemoteProcessGroup(id); + if (remoteGroup != null) { + return remoteGroup; + } + + for (final ProcessGroup group : start.getProcessGroups()) { + remoteGroup = findRemoteProcessGroup(id, group); + if (remoteGroup != null) { + return remoteGroup; + } + } + + return null; + } + + @Override + public ProcessorNode findProcessor(final String id) { + return findProcessor(id, this); + } + + private ProcessorNode findProcessor(final String id, final ProcessGroup start) { + ProcessorNode node = start.getProcessor(id); + if (node != null) { + return node; + } + + for (final ProcessGroup group : start.getProcessGroups()) { + node = findProcessor(id, group); + if (node != null) { + return node; + } + } + + return null; + } + + @Override + public List<ProcessorNode> findAllProcessors() { + return findAllProcessors(this); + } + + private List<ProcessorNode> findAllProcessors(final ProcessGroup start) { + final List<ProcessorNode> allNodes = new ArrayList<>(start.getProcessors()); + for (final ProcessGroup group : start.getProcessGroups()) { + allNodes.addAll(findAllProcessors(group)); + } + return allNodes; + } + + public Connectable findConnectable(final String identifier) { + return findConnectable(identifier, this); + } + + private static Connectable findConnectable(final String identifier, final ProcessGroup group) { + final ProcessorNode procNode = group.getProcessor(identifier); + if (procNode != null) { + return procNode; + } + + final Port inPort = group.getInputPort(identifier); + if (inPort != null) { + return inPort; + } + + final Port outPort = group.getOutputPort(identifier); + if (outPort != null) { + return outPort; + } + + final Funnel funnel = group.getFunnel(identifier); + if (funnel != null) { + return funnel; + } + + for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) { + final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier); + if (remoteInPort != null) { + return remoteInPort; + } + + final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier); + if (remoteOutPort != null) { + return remoteOutPort; + } + } + + for (final ProcessGroup childGroup : group.getProcessGroups()) { + final Connectable childGroupConnectable = findConnectable(identifier, childGroup); + if (childGroupConnectable != null) { + return childGroupConnectable; + } + } + + return null; + } + + @Override + public List<Label> findAllLabels() { + return findAllLabels(this); + } + + private List<Label> findAllLabels(final ProcessGroup start) { + final List<Label> allLabels = new ArrayList<>(start.getLabels()); + for (final ProcessGroup group : start.getProcessGroups()) { + allLabels.addAll(findAllLabels(group)); + } + return allLabels; + } + + @Override + public Port findInputPort(final String id) { + return findPort(id, this, new InputPortRetriever()); + } + + @Override + public Port findOutputPort(final String id) { + return findPort(id, this, new OutputPortRetriever()); + } + + @Override + public Port getInputPortByName(final String name) { + return getPortByName(name, this, new InputPortRetriever()); + } + + @Override + public Port getOutputPortByName(final String name) { + return getPortByName(name, this, new OutputPortRetriever()); + } + + private interface PortRetriever { + + Port getPort(ProcessGroup group, String id); + + Set<Port> getPorts(ProcessGroup group); + } + + private static class InputPortRetriever implements PortRetriever { + + @Override + public Set<Port> getPorts(final ProcessGroup group) { + return group.getInputPorts(); + } + + @Override + public Port getPort(final ProcessGroup group, final String id) { + return group.getInputPort(id); + } + } + + private static class OutputPortRetriever implements PortRetriever { + + @Override + public Set<Port> getPorts(final ProcessGroup group) { + return group.getOutputPorts(); + } + + @Override + public Port getPort(final ProcessGroup group, final String id) { + return group.getOutputPort(id); + } + } + + private Port findPort(final String id, final ProcessGroup group, final PortRetriever retriever) { + Port port = retriever.getPort(group, id); + if (port != null) { + return port; + } + + for (final ProcessGroup childGroup : group.getProcessGroups()) { + port = findPort(id, childGroup, retriever); + if (port != null) { + return port; + } + } + + return null; + } + + private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) { + for (final Port port : retriever.getPorts(group)) { + if (port.getName().equals(name)) { + return port; + } + } + + return null; + } + + @Override + public void addFunnel(final Funnel funnel) { + writeLock.lock(); + try { + final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier()); + if (existing != null) { + throw new IllegalStateException("A funnel already exists in this ProcessGroup with ID " + funnel.getIdentifier()); + } + + funnel.setProcessGroup(this); + funnels.put(funnel.getIdentifier(), funnel); + } finally { + writeLock.unlock(); + } + } + + @Override + public Funnel getFunnel(final String id) { + readLock.lock(); + try { + return funnels.get(id); + } finally { + readLock.unlock(); + } + } + + @Override + public void removeFunnel(final Funnel funnel) { + writeLock.lock(); + try { + final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier()); + if (existing == null) { + throw new IllegalStateException(funnel + " is not a member of this ProcessGroup"); + } + + funnel.verifyCanDelete(); + for (final Connection conn : funnel.getConnections()) { + conn.verifyCanDelete(); + } + + stopFunnel(funnel); + + // must copy to avoid a concurrent modification + final Set<Connection> copy = new HashSet<>(funnel.getConnections()); + for (final Connection conn : copy) { + removeConnection(conn); + } + + funnels.remove(funnel.getIdentifier()); + LOG.info("{} removed from flow", funnel); + } finally { + writeLock.unlock(); + } + } + + @Override + public Set<Funnel> getFunnels() { + readLock.lock(); + try { + return new HashSet<>(funnels.values()); + } finally { + readLock.unlock(); + } + } + + @Override + public void remove(final Snippet snippet) { + writeLock.lock(); + try { + // ensure that all components are valid + verifyContents(snippet); + + final Set<Connectable> connectables = getAllConnectables(snippet); + final Set<String> connectionIdsToRemove = new HashSet<>(replaceNullWithEmptySet(snippet.getConnections())); + // Remove all connections that are the output of any Connectable. + for (final Connectable connectable : connectables) { + for (final Connection conn : connectable.getConnections()) { + if (!connections.containsKey(conn.getIdentifier())) { + throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections from the parent Process Group"); + } + connectionIdsToRemove.add(conn.getIdentifier()); + } + } + + // verify that all connections can be removed + for (final String id : connectionIdsToRemove) { + connections.get(id).verifyCanDelete(); + } + + // verify that all processors are stopped and have no active threads + for (final String procId : snippet.getProcessors()) { + final ProcessorNode procNode = getProcessor(procId); + if (procNode.isRunning()) { + throw new IllegalStateException(procNode + " cannot be removed because it is running"); + } + final int activeThreadCount = scheduler.getActiveThreadCount(procNode); + if (activeThreadCount != 0) { + throw new IllegalStateException(procNode + " cannot be removed because it still has " + activeThreadCount + " active threads"); + } + } + + // verify that none of the connectables have incoming connections that are not in the Snippet. + final Set<String> connectionIds = snippet.getConnections(); + for (final Connectable connectable : connectables) { + for (final Connection conn : connectable.getIncomingConnections()) { + if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) { + throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections that are not selected to be deleted"); + } + } + } + + // verify that all of the ProcessGroups in the snippet are empty + for (final String groupId : snippet.getProcessGroups()) { + final ProcessGroup toRemove = getProcessGroup(groupId); + if (!toRemove.isEmpty()) { + throw new IllegalStateException("Process Group with name " + toRemove.getName() + " cannot be removed because it is not empty"); + } + } + + for (final String id : connectionIdsToRemove) { + removeConnection(connections.get(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) { + removeInputPort(inputPorts.get(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) { + removeOutputPort(outputPorts.get(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) { + removeFunnel(funnels.get(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getLabels())) { + removeLabel(labels.get(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) { + removeProcessGroup(processGroups.get(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) { + removeProcessor(processors.get(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) { + removeRemoteProcessGroup(remoteGroups.get(id)); + } + } finally { + writeLock.unlock(); + } + } + + private Set<String> replaceNullWithEmptySet(final Set<String> toReplace) { + return (toReplace == null) ? new HashSet<String>() : toReplace; + } + + @Override + public void move(final Snippet snippet, final ProcessGroup destination) { + writeLock.lock(); + try { + verifyContents(snippet); + + if (!isDisconnected(snippet)) { + throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved."); + } + + if (isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) { + throw new IllegalStateException("Cannot move Ports from the Root Group to a Non-Root Group"); + } + + for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) { + destination.addInputPort(inputPorts.remove(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) { + destination.addOutputPort(outputPorts.remove(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) { + destination.addFunnel(funnels.remove(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getLabels())) { + destination.addLabel(labels.remove(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) { + destination.addProcessGroup(processGroups.remove(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) { + destination.addProcessor(processors.remove(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) { + destination.addRemoteProcessGroup(remoteGroups.remove(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getConnections())) { + destination.inheritConnection(connections.remove(id)); + } + } finally { + writeLock.unlock(); + } + } + + private Set<Connectable> getAllConnectables(final Snippet snippet) { + final Set<Connectable> connectables = new HashSet<>(); + for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) { + connectables.add(getInputPort(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) { + connectables.add(getOutputPort(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) { + connectables.add(getFunnel(id)); + } + for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) { + connectables.add(getProcessor(id)); + } + return connectables; + } + + private boolean isDisconnected(final Snippet snippet) { + final Set<Connectable> connectables = getAllConnectables(snippet); + + for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) { + final RemoteProcessGroup remoteGroup = getRemoteProcessGroup(id); + connectables.addAll(remoteGroup.getInputPorts()); + connectables.addAll(remoteGroup.getOutputPorts()); + } + + final Set<String> connectionIds = snippet.getConnections(); + for (final Connectable connectable : connectables) { + for (final Connection conn : connectable.getIncomingConnections()) { + if (!connectionIds.contains(conn.getIdentifier())) { + return false; + } + } + + for (final Connection conn : connectable.getConnections()) { + if (!connectionIds.contains(conn.getIdentifier())) { + return false; + } + } + } + + final Set<Connectable> recursiveConnectables = new HashSet<>(connectables); + for (final String id : snippet.getProcessGroups()) { + final ProcessGroup childGroup = getProcessGroup(id); + recursiveConnectables.addAll(findAllConnectables(childGroup, true)); + } + + for (final String id : connectionIds) { + final Connection connection = getConnection(id); + if (!recursiveConnectables.contains(connection.getSource()) || !recursiveConnectables.contains(connection.getDestination())) { + return false; + } + } + + return true; + } + + private Set<Connectable> findAllConnectables(final ProcessGroup group, final boolean includeRemotePorts) { + final Set<Connectable> set = new HashSet<>(); + set.addAll(group.getInputPorts()); + set.addAll(group.getOutputPorts()); + set.addAll(group.getFunnels()); + set.addAll(group.getProcessors()); + if (includeRemotePorts) { + for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) { + set.addAll(remoteGroup.getInputPorts()); + set.addAll(remoteGroup.getOutputPorts()); + } + } + + for (final ProcessGroup childGroup : group.getProcessGroups()) { + set.addAll(findAllConnectables(childGroup, includeRemotePorts)); + } + + return set; + } + + /** + * Verifies that all ID's defined within the given snippet reference + * components within this ProcessGroup. If this is not the case, throws + * {@link IllegalStateException}. + * + * @param snippet + * @throws NullPointerException if the argument is null + * @throws IllegalStateException if the snippet contains an ID that + * references a component that is not part of this ProcessGroup + */ + private void verifyContents(final Snippet snippet) throws NullPointerException, IllegalStateException { + requireNonNull(snippet); + + verifyAllKeysExist(snippet.getInputPorts(), inputPorts, "Input Port"); + verifyAllKeysExist(snippet.getOutputPorts(), outputPorts, "Output Port"); + verifyAllKeysExist(snippet.getFunnels(), funnels, "Funnel"); + verifyAllKeysExist(snippet.getLabels(), labels, "Label"); + verifyAllKeysExist(snippet.getProcessGroups(), processGroups, "Process Group"); + verifyAllKeysExist(snippet.getProcessors(), processors, "Processor"); + verifyAllKeysExist(snippet.getRemoteProcessGroups(), remoteGroups, "Remote Process Group"); + verifyAllKeysExist(snippet.getConnections(), connections, "Connection"); + } + + /** + * <p> + * Verifies that all ID's specified by the given set exist as keys in the + * given Map. If any of the ID's does not exist as a key in the map, will + * throw {@link IllegalStateException} indicating the ID that is invalid and + * specifying the Component Type. + * </p> + * + * <p> + * If the ids given are null, will do no validation. + * </p> + * + * @param ids + * @param map + * @param componentType + */ + private void verifyAllKeysExist(final Set<String> ids, final Map<String, ?> map, final String componentType) { + if (ids != null) { + for (final String id : ids) { + if (!map.containsKey(id)) { + throw new IllegalStateException("ID " + id + " does not refer to a(n) " + componentType + " in this ProcessGroup"); + } + } + } + } + + @Override + public void verifyCanDelete() { + if (!isEmpty()) { + throw new IllegalStateException(this + " is not empty"); + } + } + + @Override + public void verifyCanStop() { + } + + @Override + public void verifyCanStart() { + readLock.lock(); + try { + for (final Connectable connectable : findAllConnectables(this, false)) { + if (connectable.getScheduledState() == ScheduledState.STOPPED) { + if (scheduler.getActiveThreadCount(connectable) > 0) { + throw new IllegalStateException("Cannot start " + connectable + " because it is currently stopping"); + } + + connectable.verifyCanStart(); + } + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanDelete(final Snippet snippet) throws IllegalStateException { + readLock.lock(); + try { + if (!id.equals(snippet.getParentGroupId())) { + throw new IllegalStateException("Snippet belongs to ProcessGroup with ID " + snippet.getParentGroupId() + " but this ProcessGroup has id " + id); + } + + if (!isDisconnected(snippet)) { + throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved."); + } + + for (final String id : snippet.getConnections()) { + final Connection connection = getConnection(id); + if (connection == null) { + throw new IllegalStateException("Snippet references Connection with ID " + id + ", which does not exist in this ProcessGroup"); + } + + connection.verifyCanDelete(); + } + + for (final String id : snippet.getFunnels()) { + final Funnel funnel = getFunnel(id); + if (funnel == null) { + throw new IllegalStateException("Snippet references Funnel with ID " + id + ", which does not exist in this ProcessGroup"); + } + + funnel.verifyCanDelete(true); + } + + for (final String id : snippet.getInputPorts()) { + final Port port = getInputPort(id); + if (port == null) { + throw new IllegalStateException("Snippet references Input Port with ID " + id + ", which does not exist in this ProcessGroup"); + } + + port.verifyCanDelete(true); + } + + for (final String id : snippet.getLabels()) { + final Label label = getLabel(id); + if (label == null) { + throw new IllegalStateException("Snippet references Label with ID " + id + ", which does not exist in this ProcessGroup"); + } + } + + for (final String id : snippet.getOutputPorts()) { + final Port port = getOutputPort(id); + if (port == null) { + throw new IllegalStateException("Snippet references Output Port with ID " + id + ", which does not exist in this ProcessGroup"); + } + port.verifyCanDelete(true); + } + + for (final String id : snippet.getProcessGroups()) { + final ProcessGroup group = getProcessGroup(id); + if (group == null) { + throw new IllegalStateException("Snippet references Process Group with ID " + id + ", which does not exist in this ProcessGroup"); + } + group.verifyCanDelete(); + } + + for (final String id : snippet.getProcessors()) { + final ProcessorNode processor = getProcessor(id); + if (processor == null) { + throw new IllegalStateException("Snippet references Processor with ID " + id + ", which does not exist in this ProcessGroup"); + } + processor.verifyCanDelete(true); + } + + for (final String id : snippet.getRemoteProcessGroups()) { + final RemoteProcessGroup group = getRemoteProcessGroup(id); + if (group == null) { + throw new IllegalStateException("Snippet references Remote Process Group with ID " + id + ", which does not exist in this ProcessGroup"); + } + group.verifyCanDelete(true); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanMove(final Snippet snippet, final ProcessGroup newProcessGroup) throws IllegalStateException { + readLock.lock(); + try { + if (!id.equals(snippet.getParentGroupId())) { + throw new IllegalStateException("Snippet belongs to ProcessGroup with ID " + snippet.getParentGroupId() + " but this ProcessGroup has id " + id); + } + + verifyContents(snippet); + + if (!isDisconnected(snippet)) { + throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved."); + } + + if (isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) { + throw new IllegalStateException("Cannot move Ports from the Root Group to a Non-Root Group"); + } + + for (final String id : snippet.getInputPorts()) { + final Port port = getInputPort(id); + final String portName = port.getName(); + + if (newProcessGroup.getInputPortByName(portName) != null) { + throw new IllegalStateException("Cannot perform Move Operation because the destination Process Group already has an Input Port with the name " + portName); + } + } + + for (final String id : snippet.getOutputPorts()) { + final Port port = getOutputPort(id); + final String portName = port.getName(); + + if (newProcessGroup.getOutputPortByName(portName) != null) { + throw new IllegalStateException("Cannot perform Move Operation because the destination Process Group already has an Output Port with the name " + portName); + } + } + } finally { + readLock.unlock(); + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index 0000000,318901f..ac58504 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@@ -1,0 -1,113 +1,113 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processor; + + import java.util.Map; + import java.util.Set; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.PropertyValue; + import org.apache.nifi.controller.ControllerServiceLookup; + import org.apache.nifi.controller.ProcessorNode; + import org.apache.nifi.controller.service.ControllerServiceNode; + import org.apache.nifi.controller.service.ControllerServiceProvider; + + public class StandardSchedulingContext implements SchedulingContext { + + private final ProcessContext processContext; + private final ControllerServiceProvider serviceProvider; + private final ProcessorNode processorNode; + + public StandardSchedulingContext(final ProcessContext processContext, final ControllerServiceProvider serviceProvider, final ProcessorNode processorNode) { + this.processContext = processContext; + this.serviceProvider = serviceProvider; + this.processorNode = processorNode; + } + + @Override + public void leaseControllerService(final String identifier) { + final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(identifier); + if (serviceNode == null) { + throw new IllegalArgumentException("Cannot lease Controller Service because no Controller Service exists with identifier " + identifier); + } + + if (serviceNode.isDisabled()) { - throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is currently disabled"); ++ throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is currently disabled"); + } + + if (!serviceNode.isValid()) { - throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is not currently valid"); ++ throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is not currently valid"); + } + + serviceNode.addReference(processorNode); + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor descriptor) { + return processContext.getProperty(descriptor); + } + + @Override + public PropertyValue getProperty(final String propertyName) { + return processContext.getProperty(propertyName); + } + + @Override + public PropertyValue newPropertyValue(final String rawValue) { + return processContext.newPropertyValue(rawValue); + } + + @Override + public void yield() { + processContext.yield(); + } + + @Override + public int getMaxConcurrentTasks() { + return processContext.getMaxConcurrentTasks(); + } + + @Override + public String getAnnotationData() { + return processContext.getAnnotationData(); + } + + @Override + public Map<PropertyDescriptor, String> getProperties() { + return processContext.getProperties(); + } + + @Override + public String encrypt(final String unencrypted) { + return processContext.encrypt(unencrypted); + } + + @Override + public String decrypt(final String encrypted) { + return processContext.decrypt(encrypted); + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return processContext.getControllerServiceLookup(); + } + + @Override + public Set<Relationship> getAvailableRelationships() { + return processContext.getAvailableRelationships(); + } + }
