Repository: nifi Updated Branches: refs/heads/master b4894c557 -> 4cca9bef7
NIFI-5480: Use FlowController's maps of components in order to look up component by ID rather than iterating recursively through all Process Groups to find the component This closes #2932 Signed-off-by: Jeremy Dyer <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4cca9bef Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4cca9bef Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4cca9bef Branch: refs/heads/master Commit: 4cca9bef7c9bcad912cb7df6f3976715d09a6daf Parents: b4894c5 Author: Mark Payne <[email protected]> Authored: Wed Aug 1 17:49:51 2018 -0400 Committer: Jeremy Dyer <[email protected]> Committed: Wed Aug 1 19:33:56 2018 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/groups/ProcessGroup.java | 8 -- .../apache/nifi/controller/FlowController.java | 33 ++++- .../reporting/StandardReportingContext.java | 4 +- .../nifi/groups/StandardProcessGroup.java | 134 ++++++------------- .../service/mock/MockProcessGroup.java | 5 - .../StandardAuthorizableLookup.java | 4 +- .../nifi/web/controller/ControllerFacade.java | 6 +- .../web/dao/impl/StandardProcessGroupDAO.java | 58 ++++++-- 8 files changed, 130 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index e9c4d87..01451df 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -771,14 +771,6 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi void remove(final Snippet snippet); /** - * @param identifier of connectable - * @return the Connectable with the given ID, if it exists; otherwise - * returns null. This performs a recursive search of all ProcessGroups' - * input ports, output ports, funnels, processors - */ - Connectable findLocalConnectable(String identifier); - - /** * @param identifier of remote group port * @return the RemoteGroupPort with the given ID, if it exists; otherwise * returns null. http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index e7623e5..b83749c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2697,6 +2697,35 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R allProcessors.remove(identifier); } + public Connectable findLocalConnectable(final String id) { + final ProcessorNode procNode = getProcessorNode(id); + if (procNode != null) { + return procNode; + } + + final Port inPort = getInputPort(id); + if (inPort != null) { + return inPort; + } + + final Port outPort = getOutputPort(id); + if (outPort != null) { + return outPort; + } + + final Funnel funnel = getFunnel(id); + if (funnel != null) { + return funnel; + } + + final RemoteGroupPort remoteGroupPort = getRootGroup().findRemoteGroupPort(id); + if (remoteGroupPort != null) { + return remoteGroupPort; + } + + return null; + } + public ProcessorNode getProcessorNode(final String id) { return allProcessors.get(id); } @@ -4933,7 +4962,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R authorizable = new DataAuthorizable(getRootGroup()); } else { // check if the component is a connectable, this should be the case most often - final Connectable connectable = getRootGroup().findLocalConnectable(componentId); + final Connectable connectable = findLocalConnectable(componentId); if (connectable == null) { // if the component id is not a connectable then consider a connection final Connection connection = getRootGroup().findConnection(componentId); @@ -4980,7 +5009,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R authorizable = new ProvenanceDataAuthorizable(getRootGroup()); } else { // check if the component is a connectable, this should be the case most often - final Connectable connectable = getRootGroup().findLocalConnectable(componentId); + final Connectable connectable = findLocalConnectable(componentId); if (connectable == null) { // if the component id is not a connectable then consider a connection final Connection connection = getRootGroup().findConnection(componentId); http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java index 440fd79..d95a220 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java @@ -29,7 +29,6 @@ import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.events.BulletinFactory; -import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; @@ -95,8 +94,7 @@ public class StandardReportingContext implements ReportingContext, ControllerSer @Override public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) { - final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId()); - final Connectable connectable = rootGroup.findLocalConnectable(componentId); + final Connectable connectable = flowController.findLocalConnectable(componentId); if (connectable == null) { throw new IllegalStateException("Cannot create Component-Level Bulletin because no component can be found with ID " + componentId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index e77be2a..edfa355 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -16,31 +16,6 @@ */ package org.apache.nifi.groups; -import static java.util.Objects.requireNonNull; - -import java.io.IOException; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.security.SecureRandom; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import java.util.stream.Collectors; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -145,6 +120,31 @@ import org.apache.nifi.web.api.dto.TemplateDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + public final class StandardProcessGroup implements ProcessGroup { private final String id; @@ -1696,80 +1696,34 @@ public final class StandardProcessGroup implements ProcessGroup { return allNodes; } - @Override - public Connectable findLocalConnectable(final String identifier) { - return findLocalConnectable(identifier, this); - } - - private static Connectable findLocalConnectable(final String identifier, final ProcessGroup group) { - final ProcessorNode procNode = group.getProcessor(identifier); - if (procNode != null) { - return procNode; - } - - final Port inPort = group.getInputPort(identifier); - if (inPort != null) { - return inPort; - } - - final Port outPort = group.getOutputPort(identifier); - if (outPort != null) { - return outPort; - } - - final Funnel funnel = group.getFunnel(identifier); - if (funnel != null) { - return funnel; - } - - for (final RemoteProcessGroup remoteProcessGroup : group.getRemoteProcessGroups()) { - final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(identifier); - if (remoteInputPort != null) { - return remoteInputPort; - } - - final RemoteGroupPort remoteOutputPort = remoteProcessGroup.getOutputPort(identifier); - if (remoteOutputPort != null) { - return remoteOutputPort; - } - } - - for (final ProcessGroup childGroup : group.getProcessGroups()) { - final Connectable childGroupConnectable = findLocalConnectable(identifier, childGroup); - if (childGroupConnectable != null) { - return childGroupConnectable; - } - } - - return null; - } @Override public RemoteGroupPort findRemoteGroupPort(final String identifier) { - return findRemoteGroupPort(identifier, this); - } + readLock.lock(); + try { + for (final RemoteProcessGroup remoteGroup : remoteGroups.values()) { + final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier); + if (remoteInPort != null) { + return remoteInPort; + } - private static RemoteGroupPort findRemoteGroupPort(final String identifier, final ProcessGroup group) { - for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) { - final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier); - if (remoteInPort != null) { - return remoteInPort; + final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier); + if (remoteOutPort != null) { + return remoteOutPort; + } } - final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier); - if (remoteOutPort != null) { - return remoteOutPort; + for (final ProcessGroup childGroup : processGroups.values()) { + final RemoteGroupPort childGroupRemoteGroupPort = childGroup.findRemoteGroupPort(identifier); + if (childGroupRemoteGroupPort != null) { + return childGroupRemoteGroupPort; + } } - } - for (final ProcessGroup childGroup : group.getProcessGroups()) { - final RemoteGroupPort childGroupRemoteGroupPort = findRemoteGroupPort(identifier, childGroup); - if (childGroupRemoteGroupPort != null) { - return childGroupRemoteGroupPort; - } + return null; + } finally { + readLock.unlock(); } - - return null; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index 374d02b..ec2caef 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -528,11 +528,6 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public Connectable findLocalConnectable(final String identifier) { - return null; - } - - @Override public RemoteGroupPort findRemoteGroupPort(String identifier) { return null; } http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java index bbdd9e0..88d8c46 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java @@ -732,9 +732,7 @@ class StandardAuthorizableLookup implements AuthorizableLookup { @Override public Authorizable getLocalConnectable(String id) { - final ProcessGroup group = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId()); - final Connectable connectable = group.findLocalConnectable(id); - + final Connectable connectable = controllerFacade.findLocalConnectable(id); if (connectable == null) { throw new ResourceNotFoundException("Unable to find component with id " + id); } http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 1cbc5e2..1d08081 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -166,6 +166,10 @@ public class ControllerFacade implements Authorizable { } } + public Connectable findLocalConnectable(String componentId) { + return flowController.findLocalConnectable(componentId); + } + public ControllerServiceProvider getControllerServiceProvider() { return flowController; } @@ -1527,7 +1531,7 @@ public class ControllerFacade implements Authorizable { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId()); - final Connectable connectable = root.findLocalConnectable(dto.getComponentId()); + final Connectable connectable = findLocalConnectable(dto.getComponentId()); if (connectable != null) { dto.setGroupId(connectable.getProcessGroup().getIdentifier()); http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index cce3390..06aab80 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -43,6 +43,7 @@ import org.apache.nifi.web.dao.ProcessGroupDAO; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -104,13 +105,18 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou public void verifyScheduleComponents(final String groupId, final ScheduledState state,final Set<String> componentIds) { final ProcessGroup group = locateProcessGroup(flowController, groupId); + final Set<ProcessGroup> validGroups = new HashSet<>(); + validGroups.add(group); + validGroups.addAll(group.findAllProcessGroups()); + for (final String componentId : componentIds) { - final Connectable connectable = group.findLocalConnectable(componentId); + final Connectable connectable = findConnectable(componentId, groupId, validGroups); if (connectable == null) { - final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId); - if (remotePort == null) { - throw new ResourceNotFoundException("Unable to find component with id " + componentId); - } + throw new ResourceNotFoundException("Unable to find component with id " + componentId); + } + + if (connectable instanceof RemoteGroupPort) { + final RemoteGroupPort remotePort = (RemoteGroupPort) connectable; if (ScheduledState.RUNNING.equals(state)) { remotePort.verifyCanStart(); @@ -134,8 +140,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou public void verifyEnableComponents(String groupId, ScheduledState state, Set<String> componentIds) { final ProcessGroup group = locateProcessGroup(flowController, groupId); + final Set<ProcessGroup> validGroups = new HashSet<>(); + validGroups.add(group); + validGroups.addAll(group.findAllProcessGroups()); + for (final String componentId : componentIds) { - final Connectable connectable = group.findLocalConnectable(componentId); + final Connectable connectable = findConnectable(componentId, groupId, validGroups); if (ScheduledState.STOPPED.equals(state)) { connectable.verifyCanEnable(); } else if (ScheduledState.DISABLED.equals(state)) { @@ -159,14 +169,37 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } } + private Connectable findConnectable(final String componentId, final String groupId, final Set<ProcessGroup> validProcessGroups) { + // Get the component with the given ID and ensure that it belongs to the group that we are looking for. + // We do this, rather than calling ProcessGroup.findLocalConnectable because for any component that is buried several + // layers of Process Groups deep, that method becomes quite a bit more expensive than this method, due to all of the + // Read Locks that must be obtained while recursing through the Process Group's descendant groups. + final Connectable connectable = flowController.findLocalConnectable(componentId); + if (connectable == null) { + throw new ResourceNotFoundException("Could not find Component with ID " + componentId); + } + + final ProcessGroup connectableGroup = connectable.getProcessGroup(); + if (!validProcessGroups.contains(connectableGroup)) { + throw new ResourceNotFoundException("Component with ID " + componentId + " does not belong to Process Group " + groupId + " or any of its descendent groups"); + } + + return connectable; + } + @Override public Future<Void> scheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) { final ProcessGroup group = locateProcessGroup(flowController, groupId); CompletableFuture<Void> future = CompletableFuture.completedFuture(null); + final Set<ProcessGroup> validGroups = new HashSet<>(); + validGroups.add(group); + validGroups.addAll(group.findAllProcessGroups()); + for (final String componentId : componentIds) { - final Connectable connectable = group.findLocalConnectable(componentId); + final Connectable connectable = findConnectable(componentId, groupId, validGroups); + if (ScheduledState.RUNNING.equals(state)) { switch (connectable.getConnectableType()) { case PROCESSOR: @@ -181,7 +214,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou break; case REMOTE_INPUT_PORT: case REMOTE_OUTPUT_PORT: - final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId); + final RemoteGroupPort remotePort = (RemoteGroupPort) connectable; remotePort.getRemoteProcessGroup().startTransmitting(remotePort); break; } @@ -199,7 +232,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou break; case REMOTE_INPUT_PORT: case REMOTE_OUTPUT_PORT: - final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId); + final RemoteGroupPort remotePort = (RemoteGroupPort) connectable; remotePort.getRemoteProcessGroup().stopTransmitting(remotePort); break; } @@ -213,8 +246,13 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou public void enableComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) { final ProcessGroup group = locateProcessGroup(flowController, groupId); + final Set<ProcessGroup> validGroups = new HashSet<>(); + validGroups.add(group); + validGroups.addAll(group.findAllProcessGroups()); + for (final String componentId : componentIds) { - final Connectable connectable = group.findLocalConnectable(componentId); + final Connectable connectable = findConnectable(componentId, groupId, validGroups); + if (ScheduledState.STOPPED.equals(state)) { switch (connectable.getConnectableType()) { case PROCESSOR:
