http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java index 8571f51..b5ec133 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java @@ -17,15 +17,12 @@ package org.apache.nifi.controller.cluster; -import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; -import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,14 +33,12 @@ public class ZooKeeperClientConfig { private final int sessionTimeoutMillis; private final int connectionTimeoutMillis; private final String rootPath; - private final List<ACL> acls; - private ZooKeeperClientConfig(String connectString, int sessionTimeoutMillis, int connectionTimeoutMillis, String rootPath, List<ACL> acls) { + private ZooKeeperClientConfig(String connectString, int sessionTimeoutMillis, int connectionTimeoutMillis, String rootPath) { this.connectString = connectString; this.sessionTimeoutMillis = sessionTimeoutMillis; this.connectionTimeoutMillis = connectionTimeoutMillis; this.rootPath = rootPath.endsWith("/") ? rootPath.substring(0, rootPath.length() - 1) : rootPath; - this.acls = acls; } public String getConnectString() { @@ -62,10 +57,6 @@ public class ZooKeeperClientConfig { return rootPath; } - public List<ACL> getACLs() { - return acls; - } - public String resolvePath(final String path) { if (path.startsWith("/")) { return rootPath + path; @@ -83,18 +74,6 @@ public class ZooKeeperClientConfig { final long sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT); final long connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT); final String rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE); - final String accessControl = properties.getProperty(NiFiProperties.ZOOKEEPER_ACCESS_CONTROL); - - final List<ACL> acls; - if (accessControl == null || accessControl.trim().isEmpty()) { - acls = null; - } else if (accessControl.equalsIgnoreCase("Open")) { - acls = Ids.OPEN_ACL_UNSAFE; - } else if (accessControl.equalsIgnoreCase("CreatorOnly")) { - acls = Ids.CREATOR_ALL_ACL; - } else { - acls = null; - } try { PathUtils.validatePath(rootPath); @@ -102,7 +81,7 @@ public class ZooKeeperClientConfig { throw new IllegalArgumentException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath); } - return new ZooKeeperClientConfig(connectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath, acls); + return new ZooKeeperClientConfig(connectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath); } private static int getTimePeriod(final Properties properties, final String propertyName, final String defaultValue) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java deleted file mode 100644 index 4348cec..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.cluster; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Properties; - -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryForever; -import org.apache.nifi.cluster.protocol.NodeProtocolSender; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Uses ZooKeeper in order to determine which node is the elected Cluster Coordinator and to indicate - * that this node is part of the cluster. However, once the Cluster Coordinator is known, heartbeats are - * sent directly to the Cluster Coordinator. - */ -public class ZooKeeperHeartbeater implements Heartbeater { - private static final Logger logger = LoggerFactory.getLogger(ZooKeeperHeartbeater.class); - - private final NodeProtocolSender protocolSender; - private final CuratorFramework curatorClient; - private final String nodesPathPrefix; - - private final String coordinatorPath; - private volatile String coordinatorAddress; - - - public ZooKeeperHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) { - this.protocolSender = protocolSender; - - final RetryPolicy retryPolicy = new RetryForever(5000); - final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties); - - curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), - zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); - - curatorClient.start(); - nodesPathPrefix = zkConfig.resolvePath("cluster/nodes"); - coordinatorPath = nodesPathPrefix + "/coordinator"; - } - - private String getHeartbeatAddress() throws IOException { - final String curAddress = coordinatorAddress; - if (curAddress != null) { - return curAddress; - } - - try { - // Get coordinator address and add watcher to change who we are heartbeating to if the value changes. - final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() { - @Override - public void process(final WatchedEvent event) { - coordinatorAddress = null; - } - }).forPath(coordinatorPath); - final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8); - - logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address); - return address; - } catch (Exception e) { - throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e); - } - } - - @Override - public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException { - final String heartbeatAddress = getHeartbeatAddress(); - - try { - protocolSender.heartbeat(heartbeatMessage, heartbeatAddress); - } catch (final ProtocolException pe) { - // a ProtocolException is likely the result of not being able to communicate - // with the coordinator. If we do get an IOException communicating with the coordinator, - // it will be the cause of the Protocol Exception. In this case, set coordinatorAddress - // to null so that we double-check next time that the coordinator has not changed. - if (pe.getCause() instanceof IOException) { - coordinatorAddress = null; - } - - throw pe; - } - } - - - @Override - public void close() throws IOException { - if (curatorClient != null) { - curatorClient.close(); - } - - logger.info("ZooKeeper heartbeater closed. Will no longer send Heartbeat messages to ZooKeeper"); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 77dc87e..132d623 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -207,39 +207,46 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } @Override - public void disableReferencingServices(final ControllerServiceNode serviceNode) { + public Set<ConfiguredComponent> disableReferencingServices(final ControllerServiceNode serviceNode) { // Get a list of all Controller Services that need to be disabled, in the order that they need to be // disabled. final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class); final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable); + final Set<ConfiguredComponent> updated = new HashSet<>(); for (final ControllerServiceNode nodeToDisable : toDisable) { if (nodeToDisable.isActive()) { nodeToDisable.verifyCanDisable(serviceSet); + updated.add(nodeToDisable); } } Collections.reverse(toDisable); processScheduler.disableControllerServices(toDisable); + return updated; } @Override - public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) { + public Set<ConfiguredComponent> scheduleReferencingComponents(final ControllerServiceNode serviceNode) { // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service, // or a service that references this controller service, etc. final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class); final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); + final Set<ConfiguredComponent> updated = new HashSet<>(); + // verify that we can start all components (that are not disabled) before doing anything for (final ProcessorNode node : processors) { if (node.getScheduledState() != ScheduledState.DISABLED) { node.verifyCanStart(); + updated.add(node); } } for (final ReportingTaskNode node : reportingTasks) { if (node.getScheduledState() != ScheduledState.DISABLED) { node.verifyCanStart(); + updated.add(node); } } @@ -247,22 +254,28 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi for (final ProcessorNode node : processors) { if (node.getScheduledState() != ScheduledState.DISABLED) { node.getProcessGroup().startProcessor(node); + updated.add(node); } } for (final ReportingTaskNode node : reportingTasks) { if (node.getScheduledState() != ScheduledState.DISABLED) { processScheduler.schedule(node); + updated.add(node); } } + + return updated; } @Override - public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { + public Set<ConfiguredComponent> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service, // or a service that references this controller service, etc. final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class); final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); + final Set<ConfiguredComponent> updated = new HashSet<>(); + // verify that we can stop all components (that are running) before doing anything for (final ProcessorNode node : processors) { if (node.getScheduledState() == ScheduledState.RUNNING) { @@ -279,13 +292,17 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi for (final ProcessorNode node : processors) { if (node.getScheduledState() == ScheduledState.RUNNING) { node.getProcessGroup().stopProcessor(node); + updated.add(node); } } for (final ReportingTaskNode node : reportingTasks) { if (node.getScheduledState() == ScheduledState.RUNNING) { processScheduler.unschedule(node); + updated.add(node); } } + + return updated; } @Override @@ -541,16 +558,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } @Override - public void enableReferencingServices(final ControllerServiceNode serviceNode) { + public Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode) { final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class); - enableReferencingServices(serviceNode, recursiveReferences); + return enableReferencingServices(serviceNode, recursiveReferences); } - private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) { + private Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) { if (!serviceNode.isActive()) { serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences)); } + final Set<ConfiguredComponent> updated = new HashSet<>(); + final Set<ControllerServiceNode> ifEnabled = new HashSet<>(); for (final ControllerServiceNode nodeToEnable : recursiveReferences) { if (!nodeToEnable.isActive()) { @@ -562,8 +581,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi for (final ControllerServiceNode nodeToEnable : recursiveReferences) { if (!nodeToEnable.isActive()) { enableControllerService(nodeToEnable); + updated.add(nodeToEnable); } } + + return updated; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java index 8b9f383..c298803 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java @@ -26,72 +26,42 @@ public enum ConnectionStatusDescriptor { "Bytes In (5 mins)", "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, - new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getInputBytes(); - } - })), + s -> s.getInputBytes())), INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>( "inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, - new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getInputCount()); - } - })), + s -> Long.valueOf(s.getInputCount()))), OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>( "outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, - new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getOutputBytes(); - } - })), + s -> s.getOutputBytes())), OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>( "outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, - new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getOutputCount()); - } - })), + s -> Long.valueOf(s.getOutputCount()))), QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>( "queuedBytes", "Queued Bytes", "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, - new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getQueuedBytes(); - } - })), + s -> s.getQueuedBytes())), QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>( "queuedCount", "Queued Count", "The number of FlowFiles queued in this Connection", Formatter.COUNT, - new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getQueuedCount()); - } - })); + s -> Long.valueOf(s.getQueuedCount()))); private MetricDescriptor<ConnectionStatus> descriptor; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java index d5325d0..25b9dfc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java @@ -25,92 +25,66 @@ import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; public enum ProcessGroupStatusDescriptor { - BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)", - "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesRead(); - } - })), - - BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)", - "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesWritten(); - } - })), - - BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)", + BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>( + "bytesRead", + "Bytes Read (5 mins)", + "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", + Formatter.DATA_SIZE, + s -> s.getBytesRead())), + + BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", + "Bytes Written (5 mins)", + "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", + Formatter.DATA_SIZE, + s -> s.getBytesWritten())), + + BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", + "Bytes Transferred (5 mins)", "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesRead() + status.getBytesWritten(); - } - })), - - INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", + Formatter.DATA_SIZE, + s -> s.getBytesRead() + s.getBytesWritten())), + + INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", + "Bytes In (5 mins)", "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getInputContentSize(); - } - })), - - INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)", + Formatter.DATA_SIZE, + s -> s.getInputContentSize())), + + INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", + "FlowFiles In (5 mins)", "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", - Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getInputCount().longValue(); - } - })), - - OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)", + Formatter.COUNT, + s -> s.getInputCount().longValue())), + + OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", + "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getOutputContentSize(); - } - })), - - OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)", + Formatter.DATA_SIZE, + s -> s.getOutputContentSize())), + + OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", + "FlowFiles Out (5 mins)", "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", - Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getOutputCount().longValue(); - } - })), - - QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", + Formatter.COUNT, + s -> s.getOutputCount().longValue())), + + QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", + "Queued Bytes", "The cumulative size of all FlowFiles queued in all Connections of this Process Group", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getQueuedContentSize(); - } - })), - - QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", - "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getQueuedCount().longValue(); - } - })), - - TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)", + Formatter.DATA_SIZE, + s -> s.getQueuedContentSize())), + + QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", + "Queued Count", + "The number of FlowFiles queued in all Connections of this Process Group", + Formatter.COUNT, + s -> s.getQueuedCount().longValue())), + + TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", + "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", - Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return calculateTaskMillis(status); - } - })); + Formatter.DURATION, + s -> calculateTaskMillis(s))); private MetricDescriptor<ProcessGroupStatus> descriptor; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java index 89e8aa0..05c32e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java @@ -29,130 +29,78 @@ public enum ProcessorStatusDescriptor { "Bytes Read (5 mins)", "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesRead(); - } - })), + s -> s.getBytesRead())), BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>( "bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesWritten(); - } - })), + s -> s.getBytesWritten())), BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>( "bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesRead() + status.getBytesWritten(); - } - })), + s -> s.getBytesRead() + s.getBytesWritten())), INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>( "inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getInputBytes(); - } - })), + s -> s.getInputBytes())), INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>( "inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", - Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getInputCount()); - } - })), + Formatter.COUNT, + s -> Long.valueOf(s.getInputCount()))), + OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>( "outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getOutputBytes(); - } - })), + s -> s.getOutputBytes())), OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>( "outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getOutputCount()); - } - })), + s -> Long.valueOf(s.getOutputCount()))), TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>( "taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes", Formatter.COUNT, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getInvocations()); - } - })), + s -> Long.valueOf(s.getInvocations()))), TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>( "taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS); - } - })), + s -> TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS))), FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>( "flowFilesRemoved", "FlowFiles Removed (5 mins)", "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getFlowFilesRemoved()); - } - })), + s -> Long.valueOf(s.getFlowFilesRemoved()))), AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>( "averageLineageDuration", "Average Lineage Duration (5 mins)", "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.", Formatter.DURATION, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getAverageLineageDuration(TimeUnit.MILLISECONDS); - } - }, new ValueReducer<StatusSnapshot, Long>() { + s -> s.getAverageLineageDuration(TimeUnit.MILLISECONDS), + new ValueReducer<StatusSnapshot, Long>() { @Override public Long reduce(final List<StatusSnapshot> values) { long millis = 0L; @@ -179,12 +127,7 @@ public enum ProcessorStatusDescriptor { "Average Task Duration", "The average duration it took this Processor to complete a task, as averaged over the past 5 minutes", Formatter.DURATION, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getInvocations() == 0 ? 0 : TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS) / status.getInvocations(); - } - }, + s -> s.getInvocations() == 0 ? 0 : TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS) / s.getInvocations(), new ValueReducer<StatusSnapshot, Long>() { @Override public Long reduce(final List<StatusSnapshot> values) { http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java index 0499d65..a114a00 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java @@ -24,58 +24,47 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; public enum RemoteProcessGroupStatusDescriptor { - SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)", - "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getSentContentSize(); - } - })), - - SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)", - "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getSentCount().longValue()); - } - })), - - RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)", - "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getReceivedContentSize(); - } - })), - - RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)", - "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getReceivedCount().longValue()); - } - })), - - RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second", + SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", + "Bytes Sent (5 mins)", + "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", + Formatter.DATA_SIZE, + s -> s.getSentContentSize())), + + SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", + "FlowFiles Sent (5 mins)", + "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", + Formatter.COUNT, + s -> s.getSentCount().longValue())), + + RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", + "Bytes Received (5 mins)", + "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", + Formatter.DATA_SIZE, + s -> s.getReceivedContentSize())), + + RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", + "FlowFiles Received (5 mins)", + "The number of FlowFiles that have been received from the remote system in the past 5 minutes", + Formatter.COUNT, + s -> s.getReceivedCount().longValue())), + + RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", + "Received Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", - Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getReceivedContentSize().longValue() / 300L); - } - })), + Formatter.DATA_SIZE, + s -> s.getReceivedContentSize().longValue() / 300L)), - SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second", - "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getSentContentSize().longValue() / 300L); - } - })), + SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", + "Sent Bytes Per Second", + "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", + Formatter.DATA_SIZE, + s -> s.getSentContentSize().longValue() / 300L)), - TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second", + TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", + "Total Bytes Per Second", "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", - Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + Formatter.DATA_SIZE, + new ValueMapper<RemoteProcessGroupStatus>() { @Override public Long getValue(final RemoteProcessGroupStatus status) { return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L); @@ -87,12 +76,8 @@ public enum RemoteProcessGroupStatusDescriptor { "Average Lineage Duration (5 mins)", "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.", Formatter.DURATION, - new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getAverageLineageDuration(TimeUnit.MILLISECONDS); - } - }, new ValueReducer<StatusSnapshot, Long>() { + s -> s.getAverageLineageDuration(TimeUnit.MILLISECONDS), + new ValueReducer<StatusSnapshot, Long>() { @Override public Long reduce(final List<StatusSnapshot> values) { long millis = 0L; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java index 36288d5..bbd9210 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java @@ -16,10 +16,7 @@ */ package org.apache.nifi.controller.status.history; -import java.util.ArrayList; -import java.util.Collections; import java.util.Date; -import java.util.List; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -43,36 +40,6 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit private volatile long lastCaptureTime = 0L; - private static final List<MetricDescriptor<ProcessorStatus>> PROCESSOR_METRIC_DESCRIPTORS; - private static final List<MetricDescriptor<ConnectionStatus>> CONNECTION_METRIC_DESCRIPTORS; - private static final List<MetricDescriptor<ProcessGroupStatus>> PROCESS_GROUP_METRIC_DESCRIPTORS; - private static final List<MetricDescriptor<RemoteProcessGroupStatus>> REMOTE_PROCESS_GROUP_METRIC_DESCRIPTORS; - - static { - final List<MetricDescriptor<ProcessorStatus>> procFields = new ArrayList<>(); - for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { - procFields.add(descriptor.getDescriptor()); - } - PROCESSOR_METRIC_DESCRIPTORS = Collections.unmodifiableList(procFields); - - final List<MetricDescriptor<ConnectionStatus>> connFields = new ArrayList<>(); - for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) { - connFields.add(descriptor.getDescriptor()); - } - CONNECTION_METRIC_DESCRIPTORS = Collections.unmodifiableList(connFields); - - final List<MetricDescriptor<ProcessGroupStatus>> groupFields = new ArrayList<>(); - for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) { - groupFields.add(descriptor.getDescriptor()); - } - PROCESS_GROUP_METRIC_DESCRIPTORS = Collections.unmodifiableList(groupFields); - - final List<MetricDescriptor<RemoteProcessGroupStatus>> remoteGroupFields = new ArrayList<>(); - for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) { - remoteGroupFields.add(descriptor.getDescriptor()); - } - REMOTE_PROCESS_GROUP_METRIC_DESCRIPTORS = Collections.unmodifiableList(remoteGroupFields); - } public VolatileComponentStatusRepository() { final NiFiProperties properties = NiFiProperties.getInstance(); @@ -249,26 +216,4 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit return statusReport; } } - - - - @Override - public List<MetricDescriptor<ConnectionStatus>> getConnectionMetricDescriptors() { - return CONNECTION_METRIC_DESCRIPTORS; - } - - @Override - public List<MetricDescriptor<ProcessGroupStatus>> getProcessGroupMetricDescriptors() { - return PROCESS_GROUP_METRIC_DESCRIPTORS; - } - - @Override - public List<MetricDescriptor<RemoteProcessGroupStatus>> getRemoteProcessGroupMetricDescriptors() { - return REMOTE_PROCESS_GROUP_METRIC_DESCRIPTORS; - } - - @Override - public List<MetricDescriptor<ProcessorStatus>> getProcessorMetricDescriptors() { - return PROCESSOR_METRIC_DESCRIPTORS; - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index f06ff88..cd13ba9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -53,10 +53,12 @@ import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; +import org.apache.nifi.web.Revision; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -1765,7 +1767,7 @@ public final class StandardProcessGroup implements ProcessGroup { verifyContents(snippet); final Set<Connectable> connectables = getAllConnectables(snippet); - final Set<String> connectionIdsToRemove = new HashSet<>(replaceNullWithEmptySet(snippet.getConnections())); + final Set<String> connectionIdsToRemove = new HashSet<>(getKeys(snippet.getConnections())); // Remove all connections that are the output of any Connectable. for (final Connectable connectable : connectables) { for (final Connection conn : connectable.getConnections()) { @@ -1782,7 +1784,7 @@ public final class StandardProcessGroup implements ProcessGroup { } // verify that all processors are stopped and have no active threads - for (final String procId : snippet.getProcessors()) { + for (final String procId : snippet.getProcessors().keySet()) { final ProcessorNode procNode = getProcessor(procId); if (procNode.isRunning()) { throw new IllegalStateException(procNode + " cannot be removed because it is running"); @@ -1794,7 +1796,7 @@ public final class StandardProcessGroup implements ProcessGroup { } // verify that none of the connectables have incoming connections that are not in the Snippet. - final Set<String> connectionIds = snippet.getConnections(); + final Set<String> connectionIds = snippet.getConnections().keySet(); for (final Connectable connectable : connectables) { for (final Connection conn : connectable.getIncomingConnections()) { if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) { @@ -1805,7 +1807,7 @@ public final class StandardProcessGroup implements ProcessGroup { } // verify that all of the ProcessGroups in the snippet are empty - for (final String groupId : snippet.getProcessGroups()) { + for (final String groupId : snippet.getProcessGroups().keySet()) { final ProcessGroup toRemove = getProcessGroup(groupId); toRemove.verifyCanDelete(true); } @@ -1813,25 +1815,25 @@ public final class StandardProcessGroup implements ProcessGroup { for (final String id : connectionIdsToRemove) { removeConnection(connections.get(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) { + for (final String id : getKeys(snippet.getInputPorts())) { removeInputPort(inputPorts.get(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) { + for (final String id : getKeys(snippet.getOutputPorts())) { removeOutputPort(outputPorts.get(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) { + for (final String id : getKeys(snippet.getFunnels())) { removeFunnel(funnels.get(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getLabels())) { + for (final String id : getKeys(snippet.getLabels())) { removeLabel(labels.get(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) { + for (final String id : getKeys(snippet.getProcessors())) { removeProcessor(processors.get(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) { + for (final String id : getKeys(snippet.getRemoteProcessGroups())) { removeRemoteProcessGroup(remoteGroups.get(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) { + for (final String id : getKeys(snippet.getProcessGroups())) { removeProcessGroup(processGroups.get(id)); } } finally { @@ -1839,10 +1841,11 @@ public final class StandardProcessGroup implements ProcessGroup { } } - private Set<String> replaceNullWithEmptySet(final Set<String> toReplace) { - return (toReplace == null) ? new HashSet<String>() : toReplace; + private Set<String> getKeys(final Map<String, Revision> map) { + return (map == null) ? Collections.emptySet() : map.keySet(); } + @Override public void move(final Snippet snippet, final ProcessGroup destination) { writeLock.lock(); @@ -1861,28 +1864,28 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException("Cannot move Ports into the root group"); } - for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) { + for (final String id : getKeys(snippet.getInputPorts())) { destination.addInputPort(inputPorts.remove(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) { + for (final String id : getKeys(snippet.getOutputPorts())) { destination.addOutputPort(outputPorts.remove(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) { + for (final String id : getKeys(snippet.getFunnels())) { destination.addFunnel(funnels.remove(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getLabels())) { + for (final String id : getKeys(snippet.getLabels())) { destination.addLabel(labels.remove(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) { + for (final String id : getKeys(snippet.getProcessGroups())) { destination.addProcessGroup(processGroups.remove(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) { + for (final String id : getKeys(snippet.getProcessors())) { destination.addProcessor(processors.remove(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) { + for (final String id : getKeys(snippet.getRemoteProcessGroups())) { destination.addRemoteProcessGroup(remoteGroups.remove(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getConnections())) { + for (final String id : getKeys(snippet.getConnections())) { destination.inheritConnection(connections.remove(id)); } } finally { @@ -1892,16 +1895,16 @@ public final class StandardProcessGroup implements ProcessGroup { private Set<Connectable> getAllConnectables(final Snippet snippet) { final Set<Connectable> connectables = new HashSet<>(); - for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) { + for (final String id : getKeys(snippet.getInputPorts())) { connectables.add(getInputPort(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) { + for (final String id : getKeys(snippet.getOutputPorts())) { connectables.add(getOutputPort(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) { + for (final String id : getKeys(snippet.getFunnels())) { connectables.add(getFunnel(id)); } - for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) { + for (final String id : getKeys(snippet.getProcessors())) { connectables.add(getProcessor(id)); } return connectables; @@ -1910,13 +1913,13 @@ public final class StandardProcessGroup implements ProcessGroup { private boolean isDisconnected(final Snippet snippet) { final Set<Connectable> connectables = getAllConnectables(snippet); - for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) { + for (final String id : getKeys(snippet.getRemoteProcessGroups())) { final RemoteProcessGroup remoteGroup = getRemoteProcessGroup(id); connectables.addAll(remoteGroup.getInputPorts()); connectables.addAll(remoteGroup.getOutputPorts()); } - final Set<String> connectionIds = snippet.getConnections(); + final Set<String> connectionIds = snippet.getConnections().keySet(); for (final Connectable connectable : connectables) { for (final Connection conn : connectable.getIncomingConnections()) { if (!connectionIds.contains(conn.getIdentifier())) { @@ -1932,7 +1935,7 @@ public final class StandardProcessGroup implements ProcessGroup { } final Set<Connectable> recursiveConnectables = new HashSet<>(connectables); - for (final String id : snippet.getProcessGroups()) { + for (final String id : snippet.getProcessGroups().keySet()) { final ProcessGroup childGroup = getProcessGroup(id); recursiveConnectables.addAll(findAllConnectables(childGroup, true)); } @@ -1977,14 +1980,14 @@ public final class StandardProcessGroup implements ProcessGroup { private void verifyContents(final Snippet snippet) throws NullPointerException, IllegalStateException { requireNonNull(snippet); - verifyAllKeysExist(snippet.getInputPorts(), inputPorts, "Input Port"); - verifyAllKeysExist(snippet.getOutputPorts(), outputPorts, "Output Port"); - verifyAllKeysExist(snippet.getFunnels(), funnels, "Funnel"); - verifyAllKeysExist(snippet.getLabels(), labels, "Label"); - verifyAllKeysExist(snippet.getProcessGroups(), processGroups, "Process Group"); - verifyAllKeysExist(snippet.getProcessors(), processors, "Processor"); - verifyAllKeysExist(snippet.getRemoteProcessGroups(), remoteGroups, "Remote Process Group"); - verifyAllKeysExist(snippet.getConnections(), connections, "Connection"); + verifyAllKeysExist(snippet.getInputPorts().keySet(), inputPorts, "Input Port"); + verifyAllKeysExist(snippet.getOutputPorts().keySet(), outputPorts, "Output Port"); + verifyAllKeysExist(snippet.getFunnels().keySet(), funnels, "Funnel"); + verifyAllKeysExist(snippet.getLabels().keySet(), labels, "Label"); + verifyAllKeysExist(snippet.getProcessGroups().keySet(), processGroups, "Process Group"); + verifyAllKeysExist(snippet.getProcessors().keySet(), processors, "Processor"); + verifyAllKeysExist(snippet.getRemoteProcessGroups().keySet(), remoteGroups, "Remote Process Group"); + verifyAllKeysExist(snippet.getConnections().keySet(), connections, "Connection"); } /** @@ -2104,7 +2107,7 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved."); } - for (final String id : snippet.getConnections()) { + for (final String id : snippet.getConnections().keySet()) { final Connection connection = getConnection(id); if (connection == null) { throw new IllegalStateException("Snippet references Connection with ID " + id + ", which does not exist in this ProcessGroup"); @@ -2113,7 +2116,7 @@ public final class StandardProcessGroup implements ProcessGroup { connection.verifyCanDelete(); } - for (final String id : snippet.getFunnels()) { + for (final String id : snippet.getFunnels().keySet()) { final Funnel funnel = getFunnel(id); if (funnel == null) { throw new IllegalStateException("Snippet references Funnel with ID " + id + ", which does not exist in this ProcessGroup"); @@ -2122,7 +2125,7 @@ public final class StandardProcessGroup implements ProcessGroup { funnel.verifyCanDelete(true); } - for (final String id : snippet.getInputPorts()) { + for (final String id : snippet.getInputPorts().keySet()) { final Port port = getInputPort(id); if (port == null) { throw new IllegalStateException("Snippet references Input Port with ID " + id + ", which does not exist in this ProcessGroup"); @@ -2131,14 +2134,14 @@ public final class StandardProcessGroup implements ProcessGroup { port.verifyCanDelete(true); } - for (final String id : snippet.getLabels()) { + for (final String id : snippet.getLabels().keySet()) { final Label label = getLabel(id); if (label == null) { throw new IllegalStateException("Snippet references Label with ID " + id + ", which does not exist in this ProcessGroup"); } } - for (final String id : snippet.getOutputPorts()) { + for (final String id : snippet.getOutputPorts().keySet()) { final Port port = getOutputPort(id); if (port == null) { throw new IllegalStateException("Snippet references Output Port with ID " + id + ", which does not exist in this ProcessGroup"); @@ -2146,7 +2149,7 @@ public final class StandardProcessGroup implements ProcessGroup { port.verifyCanDelete(true); } - for (final String id : snippet.getProcessGroups()) { + for (final String id : snippet.getProcessGroups().keySet()) { final ProcessGroup group = getProcessGroup(id); if (group == null) { throw new IllegalStateException("Snippet references Process Group with ID " + id + ", which does not exist in this ProcessGroup"); @@ -2154,7 +2157,7 @@ public final class StandardProcessGroup implements ProcessGroup { group.verifyCanDelete(true); } - for (final String id : snippet.getProcessors()) { + for (final String id : snippet.getProcessors().keySet()) { final ProcessorNode processor = getProcessor(id); if (processor == null) { throw new IllegalStateException("Snippet references Processor with ID " + id + ", which does not exist in this ProcessGroup"); @@ -2162,7 +2165,7 @@ public final class StandardProcessGroup implements ProcessGroup { processor.verifyCanDelete(true); } - for (final String id : snippet.getRemoteProcessGroups()) { + for (final String id : snippet.getRemoteProcessGroups().keySet()) { final RemoteProcessGroup group = getRemoteProcessGroup(id); if (group == null) { throw new IllegalStateException("Snippet references Remote Process Group with ID " + id + ", which does not exist in this ProcessGroup"); @@ -2192,7 +2195,7 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException("Cannot move Ports from the Root Group to a Non-Root Group"); } - for (final String id : snippet.getInputPorts()) { + for (final String id : snippet.getInputPorts().keySet()) { final Port port = getInputPort(id); final String portName = port.getName(); @@ -2201,7 +2204,7 @@ public final class StandardProcessGroup implements ProcessGroup { } } - for (final String id : snippet.getOutputPorts()) { + for (final String id : snippet.getOutputPorts().keySet()) { final Port port = getOutputPort(id); final String portName = port.getName(); http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java index 92a0d84..54d5959 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.persistence; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -57,6 +58,15 @@ public interface FlowConfigurationDAO { void load(OutputStream os) throws IOException; /** + * Loads the stored flow into the given stream, optionally compressed + * + * @param os the Output Stream to write the flow to + * @param compressed whether or not the data should be gzipped + * @throws IOException if unable to load the flow + */ + void load(OutputStream os, boolean compressed) throws IOException; + + /** * Saves the given stream as the stored flow. * * @param is an input stream @@ -96,4 +106,11 @@ public interface FlowConfigurationDAO { */ void save(FlowController flow, boolean archive) throws IOException; + /** + * Creates a File that can be used to write an archive to. The file will not actually exist on disk. + * + * @return a File that can be used to write an archive to + * @throws IOException if unable to access the required directories + */ + File createArchiveFile() throws IOException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index b3a6090..ac304ec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -85,6 +85,15 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD } @Override + public void load(final OutputStream os, final boolean compressed) throws IOException { + if (compressed) { + Files.copy(flowXmlPath, os); + } else { + load(os); + } + } + + @Override public synchronized void save(final InputStream is) throws IOException { try (final OutputStream outStream = Files.newOutputStream(flowXmlPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE); final OutputStream gzipOut = new GZIPOutputStream(outStream)) { @@ -140,15 +149,8 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD if (archive) { try { - final String archiveDirVal = NiFiProperties.getInstance().getProperty(CONFIGURATION_ARCHIVE_DIR_KEY); - final Path archiveDir = (archiveDirVal == null || archiveDirVal.equals("")) ? configFile.getParent().resolve("archive") : new File(archiveDirVal).toPath(); - Files.createDirectories(archiveDir); - - if (!Files.isDirectory(archiveDir)) { - throw new IOException("Archive directory doesn't appear to be a directory " + archiveDir); - } - final Path archiveFile = archiveDir.resolve(System.nanoTime() + "-" + configFile.toFile().getName()); - Files.copy(configFile, archiveFile); + final File archiveFile = createArchiveFile(); + Files.copy(configFile, archiveFile.toPath()); } catch (final Exception ex) { LOG.warn("Unable to archive flow configuration as requested due to " + ex); if (LOG.isDebugEnabled()) { @@ -158,4 +160,16 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD } } + @Override + public File createArchiveFile() throws IOException { + final String archiveDirVal = NiFiProperties.getInstance().getProperty(CONFIGURATION_ARCHIVE_DIR_KEY); + final Path archiveDir = (archiveDirVal == null || archiveDirVal.equals("")) ? flowXmlPath.getParent().resolve("archive") : new File(archiveDirVal).toPath(); + Files.createDirectories(archiveDir); + + if (!Files.isDirectory(archiveDir)) { + throw new IOException("Archive directory doesn't appear to be a directory " + archiveDir); + } + final Path archiveFile = archiveDir.resolve(System.nanoTime() + "-" + flowXmlPath.toFile().getName()); + return archiveFile.toFile(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/services/FlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/services/FlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/services/FlowService.java index 8519bb0..b6e1e1d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/services/FlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/services/FlowService.java @@ -115,4 +115,10 @@ public interface FlowService extends LifeCycle { */ FlowController getController(); + /** + * Creates a copy of the current flow and saves it in the configured 'archive' directory + * + * @throws IOException if unable to write to the archive directory + */ + void archiveFlow() throws IOException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index cf9c433..f7912a1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -169,7 +169,6 @@ nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string} nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout} nifi.zookeeper.session.timeout=${nifi.zookeeper.session.timeout} nifi.zookeeper.root.node=${nifi.zookeeper.root.node} -nifi.zookeeper.access.control=${nifi.zookeeper.access.control} # cluster manager properties (only configure for cluster manager) # nifi.cluster.is.manager=${nifi.cluster.is.manager} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/SnippetAuditor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/SnippetAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/SnippetAuditor.java index b71636b..bb96e88 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/SnippetAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/SnippetAuditor.java @@ -271,7 +271,7 @@ public class SnippetAuditor extends NiFiAuditor { // create move audit records for all items in this snippet final Collection<Action> actions = new ArrayList<>(); - for (String id : snippet.getProcessors()) { + for (String id : snippet.getProcessors().keySet()) { final ProcessorNode processor = processorDAO.getProcessor(id); final Action action = processorAuditor.generateAuditRecord(processor, Operation.Move, createMoveDetails(previousGroupId, groupId, logger)); if (action != null) { @@ -279,7 +279,7 @@ public class SnippetAuditor extends NiFiAuditor { } } - for (String id : snippet.getFunnels()) { + for (String id : snippet.getFunnels().keySet()) { final Funnel funnel = funnelDAO.getFunnel(id); final Action action = funnelAuditor.generateAuditRecord(funnel, Operation.Move, createMoveDetails(previousGroupId, groupId, logger)); if (action != null) { @@ -287,7 +287,7 @@ public class SnippetAuditor extends NiFiAuditor { } } - for (String id : snippet.getInputPorts()) { + for (String id : snippet.getInputPorts().keySet()) { final Port port = inputPortDAO.getPort(id); final Action action = portAuditor.generateAuditRecord(port, Operation.Move, createMoveDetails(previousGroupId, groupId, logger)); if (action != null) { @@ -295,7 +295,7 @@ public class SnippetAuditor extends NiFiAuditor { } } - for (String id : snippet.getOutputPorts()) { + for (String id : snippet.getOutputPorts().keySet()) { final Port port = outputPortDAO.getPort(id); final Action action = portAuditor.generateAuditRecord(port, Operation.Move, createMoveDetails(previousGroupId, groupId, logger)); if (action != null) { @@ -303,7 +303,7 @@ public class SnippetAuditor extends NiFiAuditor { } } - for (String id : snippet.getRemoteProcessGroups()) { + for (String id : snippet.getRemoteProcessGroups().keySet()) { final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id); final Action action = remoteProcessGroupAuditor.generateAuditRecord(remoteProcessGroup, Operation.Move, createMoveDetails(previousGroupId, groupId, logger)); if (action != null) { @@ -311,7 +311,7 @@ public class SnippetAuditor extends NiFiAuditor { } } - for (String id : snippet.getProcessGroups()) { + for (String id : snippet.getProcessGroups().keySet()) { final ProcessGroupDAO processGroupDAO = getProcessGroupDAO(); final ProcessGroup processGroup = processGroupDAO.getProcessGroup(id); final Action action = processGroupAuditor.generateAuditRecord(processGroup, Operation.Move, createMoveDetails(previousGroupId, groupId, logger)); @@ -320,7 +320,7 @@ public class SnippetAuditor extends NiFiAuditor { } } - for (String id : snippet.getConnections()) { + for (String id : snippet.getConnections().keySet()) { final Connection connection = connectionDAO.getConnection(id); final Action action = relationshipAuditor.generateAuditRecordForConnection(connection, Operation.Move, createMoveDetails(previousGroupId, groupId, logger)); if (action != null) { @@ -356,38 +356,38 @@ public class SnippetAuditor extends NiFiAuditor { if (snippet.isLinked()) { // locate all the components being removed final Set<Funnel> funnels = new HashSet<>(); - for (String id : snippet.getFunnels()) { + for (String id : snippet.getFunnels().keySet()) { funnels.add(funnelDAO.getFunnel(id)); } final Set<Port> inputPorts = new HashSet<>(); - for (String id : snippet.getInputPorts()) { + for (String id : snippet.getInputPorts().keySet()) { inputPorts.add(inputPortDAO.getPort(id)); } final Set<Port> outputPorts = new HashSet<>(); - for (String id : snippet.getOutputPorts()) { + for (String id : snippet.getOutputPorts().keySet()) { outputPorts.add(outputPortDAO.getPort(id)); } final Set<RemoteProcessGroup> remoteProcessGroups = new HashSet<>(); - for (String id : snippet.getRemoteProcessGroups()) { + for (String id : snippet.getRemoteProcessGroups().keySet()) { remoteProcessGroups.add(remoteProcessGroupDAO.getRemoteProcessGroup(id)); } final Set<ProcessGroup> processGroups = new HashSet<>(); final ProcessGroupDAO processGroupDAO = getProcessGroupDAO(); - for (String id : snippet.getProcessGroups()) { + for (String id : snippet.getProcessGroups().keySet()) { processGroups.add(processGroupDAO.getProcessGroup(id)); } final Set<ProcessorNode> processors = new HashSet<>(); - for (String id : snippet.getProcessors()) { + for (String id : snippet.getProcessors().keySet()) { processors.add(processorDAO.getProcessor(id)); } final Set<Connection> connections = new HashSet<>(); - for (String id : snippet.getConnections()) { + for (String id : snippet.getConnections().keySet()) { connections.add(connectionDAO.getConnection(id)); }
