NIFI-5585 A node that was previously offloaded can now be reconnected to the cluster and queue flowfiles again Added Spock test for NonLocalPartitionPartitioner Updated NOTICE files for FontAwesome with the updated version (4.7.0) and URL to the free license Updated package-lock.json with the updated version of FontAwesome (4.7.0) Added method to FlowFileQueue interface to reset an offloaded queue Queues that are now immediately have the offloaded status reset once offloading finishes SocketLoadBalancedFlowFileQueue now ignores back-pressure when offloading flowfiles Cleaned up javascript in nf-cluster-table.js when creating markup for the node operation icons Fixed incorrect handling of a heartbeat from an offloaded node. Heartbeats from offloading or offloaded nodes will now be reported as an event, the heartbeat will be removed and ignored. Added unit tests and integration tests to cover offloading nodes Updated Cluster integration test class with accessor for the current cluster coordinator Updated Node integration test class's custom NiFiProperties implementation to return the load balancing port and a method to assert an offloaded node Added exclusion to top-level pom for ITSpec.class
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/01e2098d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/01e2098d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/01e2098d Branch: refs/heads/master Commit: 01e2098d242f45f519bee0572de3c86cc7837645 Parents: be2c24c Author: Jeff Storck <[email protected]> Authored: Tue Sep 25 15:17:19 2018 -0400 Committer: Mark Payne <[email protected]> Committed: Thu Oct 11 09:23:01 2018 -0400 ---------------------------------------------------------------------- nifi-assembly/NOTICE | 2 +- .../nifi/controller/queue/FlowFileQueue.java | 14 ++ .../src/main/resources/META-INF/NOTICE | 2 +- .../coordination/ClusterCoordinator.java | 9 +- .../heartbeat/AbstractHeartbeatMonitor.java | 12 +- .../ThreadPoolRequestReplicator.java | 1 + .../node/NodeClusterCoordinator.java | 3 +- .../node/NodeClusterCoordinatorSpec.groovy | 99 ++++++++++++++ .../integration/OffloadNodeITSpec.groovy | 50 ++++++++ .../nifi/cluster/integration/Cluster.java | 4 + .../apache/nifi/cluster/integration/Node.java | 15 +++ .../nifi/controller/StandardFlowService.java | 6 +- .../controller/queue/StandardFlowFileQueue.java | 4 + .../SocketLoadBalancedFlowFileQueue.java | 72 ++++++++--- .../controller/StandardFlowServiceSpec.groovy | 128 +++++++++++++++++++ .../NonLocalPartitionPartitionerSpec.groovy | 107 ++++++++++++++++ .../TestWriteAheadFlowFileRepository.java | 4 + .../nifi/web/StandardNiFiServiceFacade.java | 3 +- .../src/main/frontend/package-lock.json | 6 +- .../src/main/resources/META-INF/NOTICE | 2 +- .../webapp/js/nf/cluster/nf-cluster-table.js | 41 +++--- .../impl/command/nifi/nodes/DisconnectNode.java | 2 +- pom.xml | 3 + 23 files changed, 527 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-assembly/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 0bea06a..dd2bf6d 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -1873,4 +1873,4 @@ SIL OFL 1.1 ****************** The following binary components are provided under the SIL Open Font License 1.1 - (SIL OFL 1.1) FontAwesome (4.6.1 - http://fortawesome.github.io/Font-Awesome/license/) + (SIL OFL 1.1) FontAwesome (4.7.0 - https://fontawesome.com/license/free) http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index 7cd0e30..8870f1d 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -267,8 +267,22 @@ public interface FlowFileQueue { void setLoadBalanceStrategy(LoadBalanceStrategy strategy, String partitioningAttribute); + /** + * Offloads the flowfiles in the queue to other nodes. This disables the queue from partition flowfiles locally. + * <p> + * This operation is a no-op if the node that contains this queue is not in a cluster. + */ void offloadQueue(); + /** + * Resets a queue that has previously been offloaded. This allows the queue to partition flowfiles locally, and + * has no other effect on processors or remote process groups. + * <p> + * This operation is a no-op if the queue is not currently offloaded or the node that contains this queue is not + * clustered. + */ + void resetOffloadedQueue(); + LoadBalanceStrategy getLoadBalanceStrategy(); void setLoadBalanceCompression(LoadBalanceCompression compression); http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE index e6a7322..fae5c91 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE @@ -212,6 +212,6 @@ SIL OFL 1.1 ****************** The following binary components are provided under the SIL Open Font License 1.1 - (SIL OFL 1.1) FontAwesome (4.6.1 - http://fortawesome.github.io/Font-Awesome/license/) + (SIL OFL 1.1) FontAwesome (4.7.0 - https://fontawesome.com/license/free) http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java index 2ad0e70..fad51d1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java @@ -72,7 +72,14 @@ public interface ClusterCoordinator { /** * Sends a request to the node to be offloaded. * The node will be marked as offloading immediately. - * + * <p> + * When a node is offloaded: + * <ul> + * <li>all processors on the node are stopped</li> + * <li>all processors on the node are terminated</li> + * <li>all remote process groups on the node stop transmitting</li> + * <li>all flowfiles on the node are sent to other nodes in the cluster</li> + * </ul> * @param nodeId the identifier of the node * @param offloadCode the code that represents why this node is being asked to be offloaded * @param explanation an explanation as to why the node is being asked to be offloaded http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index f6d09ab..5fbe3f8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -228,12 +228,14 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { return; } - if (NodeConnectionState.OFFLOADED == connectionState) { - // Cluster Coordinator believes that node is offloaded, but let the node reconnect - clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node that is offloaded. " + - "Marking as Disconnected and requesting that Node reconnect to cluster"); - clusterCoordinator.requestNodeConnect(nodeId, null); + if (NodeConnectionState.OFFLOADED == connectionState || NodeConnectionState.OFFLOADING == connectionState) { + // Cluster Coordinator can ignore this heartbeat since the node is offloaded + clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node that is offloading " + + "or offloaded. Removing this heartbeat. Offloaded nodes will only be reconnected to the cluster by an " + + "explicit connection request or restarting the node."); + removeHeartbeat(nodeId); } + if (NodeConnectionState.DISCONNECTED == connectionState) { // ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is // the only node. We allow it if it is the only node because if we have a one-node cluster, then http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index 0b2f1fe..b3a3ab9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -31,6 +31,7 @@ import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestExc import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; +import org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.exception.UriConstructionException; import org.apache.nifi.cluster.protocol.NodeIdentifier; http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 5b90e76..8c83a1d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -486,11 +486,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } if (state != NodeConnectionState.OFFLOADING) { - logger.warn("Attempted to finish node offload for {} but node is not in a offload state, it is currently {}.", nodeId, state); + logger.warn("Attempted to finish node offload for {} but node is not in the offloading state, it is currently {}.", nodeId, state); return; } logger.info("{} is now offloaded", nodeId); + updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinatorSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinatorSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinatorSpec.groovy new file mode 100644 index 0000000..2751b3e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinatorSpec.groovy @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.coordination.node + +import org.apache.nifi.cluster.coordination.flow.FlowElection +import org.apache.nifi.cluster.firewall.ClusterNodeFirewall +import org.apache.nifi.cluster.protocol.NodeIdentifier +import org.apache.nifi.cluster.protocol.NodeProtocolSender +import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener +import org.apache.nifi.cluster.protocol.message.OffloadMessage +import org.apache.nifi.components.state.Scope +import org.apache.nifi.components.state.StateManager +import org.apache.nifi.components.state.StateManagerProvider +import org.apache.nifi.controller.leader.election.LeaderElectionManager +import org.apache.nifi.events.EventReporter +import org.apache.nifi.reporting.Severity +import org.apache.nifi.state.MockStateMap +import org.apache.nifi.util.NiFiProperties +import org.apache.nifi.web.revision.RevisionManager +import spock.lang.Specification +import spock.util.concurrent.BlockingVariable + +import java.util.concurrent.TimeUnit + +class NodeClusterCoordinatorSpec extends Specification { + def "requestNodeOffload"() { + given: 'mocked collaborators' + def clusterCoordinationProtocolSenderListener = Mock(ClusterCoordinationProtocolSenderListener) + def eventReporter = Mock EventReporter + def stateManager = Mock StateManager + def stateMap = new MockStateMap([:], 1) + stateManager.getState(_ as Scope) >> stateMap + def stateManagerProvider = Mock StateManagerProvider + stateManagerProvider.getStateManager(_ as String) >> stateManager + + and: 'a NodeClusterCoordinator that manages node status in a synchronized list' + List<NodeConnectionStatus> nodeStatuses = [].asSynchronized() + def clusterCoordinator = new NodeClusterCoordinator(clusterCoordinationProtocolSenderListener, eventReporter, Mock(LeaderElectionManager), + Mock(FlowElection), Mock(ClusterNodeFirewall), + Mock(RevisionManager), NiFiProperties.createBasicNiFiProperties('src/test/resources/conf/nifi.properties', [:]), + Mock(NodeProtocolSender), stateManagerProvider) { + @Override + void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { + nodeStatuses.add(updatedStatus) + } + } + + and: 'two nodes' + def nodeIdentifier1 = createNodeIdentifier 1 + def nodeIdentifier2 = createNodeIdentifier 2 + + and: 'node 1 is connected, node 2 is disconnected' + clusterCoordinator.updateNodeStatus new NodeConnectionStatus(nodeIdentifier1, NodeConnectionState.CONNECTED) + clusterCoordinator.updateNodeStatus new NodeConnectionStatus(nodeIdentifier2, NodeConnectionState.DISCONNECTED) + while (nodeStatuses.size() < 2) { + Thread.sleep(10) + } + nodeStatuses.clear() + + def waitForReportEvent = new BlockingVariable(5, TimeUnit.SECONDS) + + when: 'a node is requested to offload' + clusterCoordinator.requestNodeOffload nodeIdentifier2, OffloadCode.OFFLOADED, 'unit test for offloading node' + waitForReportEvent.get() + + then: 'no exceptions are thrown' + noExceptionThrown() + + and: 'expected methods on collaborators are invoked' + 1 * clusterCoordinationProtocolSenderListener.offload({ OffloadMessage msg -> msg.nodeId == nodeIdentifier2 } as OffloadMessage) + 1 * eventReporter.reportEvent(Severity.INFO, 'Clustering', { msg -> msg.contains "$nodeIdentifier2.apiAddress:$nodeIdentifier2.apiPort" } as String) >> { + waitForReportEvent.set(it) + } + + and: 'the status of the offloaded node is known by the cluster coordinator to be offloading' + nodeStatuses[0].nodeIdentifier == nodeIdentifier2 + nodeStatuses[0].state == NodeConnectionState.OFFLOADING + } + + private static NodeIdentifier createNodeIdentifier(final int index) { + new NodeIdentifier("node-id-$index", "localhost", 8000 + index, "localhost", 9000 + index, + "localhost", 10000 + index, 11000 + index, false) + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/integration/OffloadNodeITSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/integration/OffloadNodeITSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/integration/OffloadNodeITSpec.groovy new file mode 100644 index 0000000..a8dd158 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/integration/OffloadNodeITSpec.groovy @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.integration + +import org.apache.nifi.cluster.coordination.node.DisconnectionCode +import org.apache.nifi.cluster.coordination.node.OffloadCode +import spock.lang.Specification + +import java.util.concurrent.TimeUnit + +class OffloadNodeITSpec extends Specification { + def "requestNodeOffload"() { + given: 'a cluster with 3 nodes' + System.setProperty 'nifi.properties.file.path', 'src/test/resources/conf/nifi.properties' + def cluster = new Cluster() + cluster.start() + cluster.createNode() + def nodeToOffload = cluster.createNode() + cluster.createNode() + cluster.waitUntilAllNodesConnected 20, TimeUnit.SECONDS + + when: 'the node to offload is disconnected successfully' + cluster.currentClusterCoordinator.clusterCoordinator.requestNodeDisconnect nodeToOffload.identifier, DisconnectionCode.USER_DISCONNECTED, + 'integration test user disconnect' + cluster.currentClusterCoordinator.assertNodeDisconnects nodeToOffload.identifier, 10, TimeUnit.SECONDS + + and: 'the node to offload is requested to offload' + nodeToOffload.getClusterCoordinator().requestNodeOffload nodeToOffload.identifier, OffloadCode.OFFLOADED, 'integration test offload' + + then: 'the node has been successfully offloaded' + cluster.currentClusterCoordinator.assertNodeIsOffloaded nodeToOffload.identifier, 10, TimeUnit.SECONDS + + cleanup: + cluster.stop() + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java index dab073d..370d6dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java @@ -144,6 +144,10 @@ public class Cluster { return node; } + public Node getCurrentClusterCoordinator() { + return getNodes().stream().filter(node -> node.hasRole(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null); + } + public Node waitForClusterCoordinator(final long time, final TimeUnit timeUnit) { return ClusterUtils.waitUntilNonNull(time, timeUnit, () -> getNodes().stream().filter(node -> node.hasRole(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null)); http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java index 3133736..b2a499a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@ -108,6 +108,8 @@ public class Node { return String.valueOf(nodeId.getSocketPort()); }else if(key.equals(NiFiProperties.WEB_HTTP_PORT)){ return String.valueOf(nodeId.getApiPort()); + }else if(key.equals(NiFiProperties.LOAD_BALANCE_PORT)){ + return String.valueOf(nodeId.getLoadBalancePort()); }else { return properties.getProperty(key); } @@ -386,4 +388,17 @@ public class Node { public void assertNodeIsConnected(final NodeIdentifier nodeId) { Assert.assertEquals(NodeConnectionState.CONNECTED, getClusterCoordinator().getConnectionStatus(nodeId).getState()); } + + /** + * Assert that the node with the given ID is offloaded (according to this node!) within the given amount of time + * + * @param nodeId id of the node + * @param time how long to wait + * @param timeUnit unit of time provided by the 'time' argument + */ + public void assertNodeIsOffloaded(final NodeIdentifier nodeId, final long time, final TimeUnit timeUnit) { + ClusterUtils.waitUntilConditionMet(time, timeUnit, + () -> getClusterCoordinator().getConnectionStatus(nodeId).getState() == NodeConnectionState.OFFLOADED, + () -> "Connection Status is " + getClusterCoordinator().getConnectionStatus(nodeId).toString()); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 297595f..957357b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -691,20 +691,19 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private void offload(final String explanation) throws InterruptedException { writeLock.lock(); try { - logger.info("Offloading node due to " + explanation); // mark node as offloading controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADING, OffloadCode.OFFLOADED, explanation)); // request to stop all processors on node controller.stopAllProcessors(); - // request to stop all remote process groups - controller.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::stopTransmitting); // terminate all processors controller.getRootGroup().findAllProcessors() // filter stream, only stopped processors can be terminated .stream().filter(pn -> pn.getScheduledState() == ScheduledState.STOPPED) .forEach(pn -> pn.getProcessGroup().terminateProcessor(pn)); + // request to stop all remote process groups + controller.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::stopTransmitting); // offload all queues on node controller.getAllQueues().forEach(FlowFileQueue::offloadQueue); // wait for rebalance of flowfiles on all queues @@ -713,6 +712,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { Thread.sleep(1000); } // finish offload + controller.getAllQueues().forEach(FlowFileQueue::resetOffloadedQueue); controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED, OffloadCode.OFFLOADED, explanation)); clusterCoordinator.finishNodeOffload(getNodeId()); http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java index ee222f4..8872ba7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java @@ -79,6 +79,10 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow } @Override + public void resetOffloadedQueue() { + } + + @Override public boolean isActivelyLoadBalancing() { return false; } http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index e99d17d..7b3a211 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -189,34 +189,41 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple if (!offloaded) { // We are already load balancing but are changing how we are load balancing. final FlowFilePartitioner partitioner; - switch (strategy) { - case DO_NOT_LOAD_BALANCE: - partitioner = new LocalPartitionPartitioner(); - break; - case PARTITION_BY_ATTRIBUTE: - partitioner = new CorrelationAttributePartitioner(partitioningAttribute); - break; - case ROUND_ROBIN: - partitioner = new RoundRobinPartitioner(); - break; - case SINGLE_NODE: - partitioner = new FirstNodePartitioner(); - break; - default: - throw new IllegalArgumentException(); - } + partitioner = getPartitionerForLoadBalancingStrategy(strategy, partitioningAttribute); setFlowFilePartitioner(partitioner); } } + private FlowFilePartitioner getPartitionerForLoadBalancingStrategy(LoadBalanceStrategy strategy, String partitioningAttribute) { + FlowFilePartitioner partitioner; + switch (strategy) { + case DO_NOT_LOAD_BALANCE: + partitioner = new LocalPartitionPartitioner(); + break; + case PARTITION_BY_ATTRIBUTE: + partitioner = new CorrelationAttributePartitioner(partitioningAttribute); + break; + case ROUND_ROBIN: + partitioner = new RoundRobinPartitioner(); + break; + case SINGLE_NODE: + partitioner = new FirstNodePartitioner(); + break; + default: + throw new IllegalArgumentException(); + } + return partitioner; + } + @Override public void offloadQueue() { if (clusterCoordinator == null) { - // Not clustered, so don't change partitions + // Not clustered, cannot offload the queue to other nodes return; } + logger.debug("Setting queue {} on node {} as offloaded", this, clusterCoordinator.getLocalNodeIdentifier()); offloaded = true; partitionWriteLock.lock(); @@ -248,6 +255,26 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } } + @Override + public void resetOffloadedQueue() { + if (clusterCoordinator == null) { + // Not clustered, was not offloading the queue to other nodes + return; + } + + if (offloaded) { + // queue was offloaded previously, allow files to be added to the local partition + offloaded = false; + logger.debug("Queue {} on node {} was previously offloaded, resetting offloaded status to {}", + this, clusterCoordinator.getLocalNodeIdentifier(), offloaded); + // reset the partitioner based on the load balancing strategy, since offloading previously changed the partitioner + FlowFilePartitioner partitioner = getPartitionerForLoadBalancingStrategy(getLoadBalanceStrategy(), getPartitioningAttribute()); + setFlowFilePartitioner(partitioner); + logger.debug("Queue {} is no longer offloaded, restored load balance strategy to {} and partitioning attribute to \"{}\"", + this, getLoadBalanceStrategy(), getPartitioningAttribute()); + } + } + public synchronized void startLoadBalancing() { logger.debug("{} started. Will begin distributing FlowFiles across the cluster", this); @@ -884,8 +911,9 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple @Override public boolean isPropagateBackpressureAcrossNodes() { - // TODO: We will want to modify this when we have the ability to offload flowfiles from a node. - return true; + // If offloaded = false, the queue is not offloading; return true to honor backpressure + // If offloaded = true, the queue is offloading or has finished offloading; return false to ignore backpressure + return !offloaded; } @Override @@ -1096,6 +1124,12 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } switch (newState) { + case CONNECTED: + if (nodeId != null && nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) { + // the node with this queue was connected to the cluster, make sure the queue is not offloaded + resetOffloadedQueue(); + } + break; case OFFLOADED: case OFFLOADING: case DISCONNECTED: http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowServiceSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowServiceSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowServiceSpec.groovy new file mode 100644 index 0000000..63fedab --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowServiceSpec.groovy @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller + +import org.apache.nifi.authorization.Authorizer +import org.apache.nifi.cluster.coordination.ClusterCoordinator +import org.apache.nifi.cluster.coordination.node.NodeConnectionState +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus +import org.apache.nifi.cluster.coordination.node.OffloadCode +import org.apache.nifi.cluster.protocol.NodeIdentifier +import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener +import org.apache.nifi.cluster.protocol.message.OffloadMessage +import org.apache.nifi.components.state.Scope +import org.apache.nifi.components.state.StateManager +import org.apache.nifi.components.state.StateManagerProvider +import org.apache.nifi.controller.queue.FlowFileQueue +import org.apache.nifi.controller.status.ProcessGroupStatus +import org.apache.nifi.encrypt.StringEncryptor +import org.apache.nifi.groups.ProcessGroup +import org.apache.nifi.groups.RemoteProcessGroup +import org.apache.nifi.state.MockStateMap +import org.apache.nifi.util.NiFiProperties +import org.apache.nifi.web.revision.RevisionManager +import spock.lang.Specification +import spock.util.concurrent.BlockingVariable + +import java.util.concurrent.TimeUnit + +class StandardFlowServiceSpec extends Specification { + def "handle an OffloadMessage"() { + given: 'a node to offload' + def nodeToOffload = createNodeIdentifier 1 + + and: 'a simple flow with one root group and a single processor' + def stateManager = Mock StateManager + def stateMap = new MockStateMap([:], 1) + stateManager.getState(_ as Scope) >> stateMap + def stateManagerProvider = Mock StateManagerProvider + stateManagerProvider.getStateManager(_ as String) >> stateManager + + def rootGroup = Mock ProcessGroup + def flowController = Mock FlowController + flowController.getStateManagerProvider() >> stateManagerProvider + _ * flowController.rootGroup >> rootGroup + + def clusterCoordinator = Mock ClusterCoordinator + + def processGroupStatus = Mock ProcessGroupStatus + def processorNode = Mock ProcessorNode + def remoteProcessGroup = Mock RemoteProcessGroup + def flowFileQueue = Mock FlowFileQueue + + and: 'a flow service to handle the OffloadMessage' + def flowService = StandardFlowService.createClusteredInstance(flowController, NiFiProperties.createBasicNiFiProperties('src/test/resources/conf/nifi.properties', + [(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT): nodeToOffload.socketPort as String, + (NiFiProperties.WEB_HTTP_PORT) : nodeToOffload.apiPort as String, + (NiFiProperties.LOAD_BALANCE_PORT) : nodeToOffload.loadBalancePort as String]), + Mock(NodeProtocolSenderListener), clusterCoordinator, Mock(StringEncryptor), Mock(RevisionManager), Mock(Authorizer)) + + def waitForFinishOffload = new BlockingVariable(5, TimeUnit.SECONDS)//new CountDownLatch(1) + + when: 'the flow services receives an OffloadMessage' + flowService.handle(new OffloadMessage(nodeId: nodeToOffload, explanation: 'unit test offload'), [] as Set) + waitForFinishOffload.get() + + then: 'no exceptions are thrown' + noExceptionThrown() + + and: 'the connection status for the node in the flow controller is set to OFFLOADING' + 1 * flowController.setConnectionStatus({ NodeConnectionStatus status -> + status.nodeIdentifier.logicallyEquals(nodeToOffload) && status.state == NodeConnectionState.OFFLOADING && status.offloadCode == OffloadCode.OFFLOADED + } as NodeConnectionStatus) + + then: 'all processors are requested to stop' + 1 * flowController.stopAllProcessors() + + then: 'all processors are requested to terminate' + 1 * processorNode.scheduledState >> ScheduledState.STOPPED + 1 * processorNode.processGroup >> rootGroup + 1 * rootGroup.terminateProcessor({ ProcessorNode pn -> pn == processorNode } as ProcessorNode) + 1 * rootGroup.findAllProcessors() >> [processorNode] + + then: 'all remote process groups are requested to terminate' + 1 * remoteProcessGroup.stopTransmitting() + 1 * rootGroup.findAllRemoteProcessGroups() >> [remoteProcessGroup] + + then: 'all queues are requested to offload' + 1 * flowFileQueue.offloadQueue() + 1 * flowController.getAllQueues() >> [flowFileQueue] + + then: 'the queued count in the flow controller status is 0 to allow the offloading code to to complete' + 1 * flowController.getControllerStatus() >> processGroupStatus + 1 * processGroupStatus.getQueuedCount() >> 0 + + then: 'all queues are requested to reset to the original partitioner for the load balancing strategy' + 1 * flowFileQueue.resetOffloadedQueue() + 1 * flowController.getAllQueues() >> [flowFileQueue] + + then: 'the connection status for the node in the flow controller is set to OFFLOADED' + 1 * flowController.setConnectionStatus({ NodeConnectionStatus status -> + status.nodeIdentifier.logicallyEquals(nodeToOffload) && status.state == NodeConnectionState.OFFLOADED && status.offloadCode == OffloadCode.OFFLOADED + } as NodeConnectionStatus) + + then: 'the cluster coordinator is requested to finish the node offload' + 1 * clusterCoordinator.finishNodeOffload({ NodeIdentifier nodeIdentifier -> + nodeIdentifier.logicallyEquals(nodeToOffload) + } as NodeIdentifier) >> { waitForFinishOffload.set(it) } + } + + private static NodeIdentifier createNodeIdentifier(final int index) { + new NodeIdentifier("node-id-$index", "localhost", 8000 + index, "localhost", 9000 + index, + "localhost", 10000 + index, 11000 + index, false) + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitionerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitionerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitionerSpec.groovy new file mode 100644 index 0000000..f3cfd4c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitionerSpec.groovy @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.queue.clustered.partition + + +import org.apache.nifi.controller.repository.FlowFileRecord +import spock.lang.Specification +import spock.lang.Unroll + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +class NonLocalPartitionPartitionerSpec extends Specification { + + def "getPartition chooses local partition with 1 partition and throws IllegalStateException"() { + given: "a local partitioner using a local partition" + def partitioner = new NonLocalPartitionPartitioner() + def localPartition = Mock QueuePartition + def partitions = [localPartition] as QueuePartition[] + def flowFileRecord = Mock FlowFileRecord + + when: "a partition is requested from the partitioner" + partitioner.getPartition flowFileRecord, partitions, localPartition + + then: "an IllegalStateExceptions thrown" + thrown(IllegalStateException) + } + + @Unroll + def "getPartition chooses non-local partition with #maxPartitions partitions, #threads threads, #iterations iterations"() { + given: "a local partitioner" + def partitioner = new NonLocalPartitionPartitioner() + def partitions = new QueuePartition[maxPartitions] + + and: "a local partition" + def localPartition = Mock QueuePartition + partitions[0] = localPartition + + and: "one or more multiple partitions" + for (int id = 1; id < maxPartitions; ++id) { + def partition = Mock QueuePartition + partitions[id] = partition + } + + and: "an array to hold the resulting chosen partitions and an executor service with one or more threads" + def flowFileRecord = Mock FlowFileRecord + def chosenPartitions = [] as ConcurrentLinkedQueue + def executorService = Executors.newFixedThreadPool threads + + when: "a partition is requested from the partitioner for a given flowfile record and the existing partitions" + iterations.times { + executorService.submit { + chosenPartitions.add partitioner.getPartition(flowFileRecord, partitions, localPartition) + } + } + executorService.shutdown() + try { + while (!executorService.awaitTermination(10, TimeUnit.MILLISECONDS)) { + Thread.sleep(10) + } + } catch (InterruptedException e) { + executorService.shutdownNow() + Thread.currentThread().interrupt() + } + + then: "no exceptions are thrown" + noExceptionThrown() + + and: "there is a chosen partition for each iteration" + chosenPartitions.size() == iterations + + and: "each chosen partition is a remote partition and is one of the existing partitions" + def validChosenPartitions = chosenPartitions.findAll { it != localPartition && partitions.contains(it) } + + and: "there is a valid chosen partition for each iteration" + validChosenPartitions.size() == iterations + + and: "there are no other mock interactions" + 0 * _ + + where: + maxPartitions | threads | iterations + 2 | 1 | 1 + 2 | 1 | 10 + 2 | 1 | 100 + 2 | 10 | 1000 + 5 | 1 | 1 + 5 | 1 | 10 + 5 | 1 | 100 + 5 | 10 | 1000 + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 878ad13..a3ee5c1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -114,6 +114,10 @@ public class TestWriteAheadFlowFileRepository { } @Override + public void resetOffloadedQueue() { + } + + @Override public boolean isActivelyLoadBalancing() { return false; } http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 4ef241e..d875118 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -4707,7 +4707,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier); if (!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && !nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) { - throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId + " because it is not disconnected, current state = " + nodeConnectionStatus.getState()); + throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId + + " because it is not disconnected or offloaded, current state = " + nodeConnectionStatus.getState()); } clusterCoordinator.removeNode(nodeIdentifier, userDn); http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package-lock.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package-lock.json b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package-lock.json index 1d848a6..3f505a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package-lock.json +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package-lock.json @@ -362,9 +362,9 @@ "integrity": "sha512-ipiDYhdQSCZ4hSbX4rMW+XzNKMD1prg/sTvoVmSLkuQ1MVlwjJQQA+sW8tMYR3BLUr9KjodFV4pvzunvRhd33Q==" }, "font-awesome": { - "version": "4.6.1", - "resolved": "https://registry.npmjs.org/font-awesome/-/font-awesome-4.6.1.tgz", - "integrity": "sha1-VHJl+0xFu+2Qq4vE93qXs3uFKhI=" + "version": "4.7.0", + "resolved": "https://registry.npmjs.org/font-awesome/-/font-awesome-4.7.0.tgz", + "integrity": "sha1-j6jPBBGhoxr9B7BtKQK7n8gVoTM=" }, "has-color": { "version": "0.1.7", http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/META-INF/NOTICE index c0b74bc..300c6b9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/META-INF/NOTICE @@ -8,4 +8,4 @@ SIL OFL 1.1 ****************** The following binary components are provided under the SIL Open Font License 1.1 - (SIL OFL 1.1) FontAwesome (4.6.1 - http://fortawesome.github.io/Font-Awesome/license/) + (SIL OFL 1.1) FontAwesome (4.7.0 - https://fontawesome.com/license/free) http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js index 0dc74d7..f90cd3a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js @@ -630,34 +630,25 @@ // only allow the admin to modify the cluster if (nfCommon.canModifyController()) { var actionFormatter = function (row, cell, value, columnDef, dataContext) { - var canDisconnect = false; - var canConnect = false; - var isOffloaded = false; - - // determine the current status + var connectDiv = '<div title="Connect" class="pointer prompt-for-connect fa fa-plug"></div>'; + var deleteDiv = '<div title="Delete" class="pointer prompt-for-removal fa fa-trash"></div>'; + var disconnectDiv = '<div title="Disconnect" class="pointer prompt-for-disconnect fa fa-power-off"></div>'; + var offloadDiv = '<div title="Offload" class="pointer prompt-for-offload fa fa-rotate-90 fa-upload" ' + + 'style="margin-top: 5px;margin-left: 5px;margin-right: -2px;"></div>'; + var markup = ''; + + // determine the current status and create the appropriate markup if (dataContext.status === 'CONNECTED' || dataContext.status === 'CONNECTING') { - canDisconnect = true; - } - if (dataContext.status === 'DISCONNECTED') { - canConnect = true; - } - if (dataContext.status === 'OFFLOADED') { - isOffloaded = true; - } - - // return the appropriate markup - if (canConnect) { - return '<div title="Connect" class="pointer prompt-for-connect fa fa-plug"></div>' + - '<div title="Delete" class="pointer prompt-for-removal fa fa-trash"></div>' + - '<div title="Offload" class="pointer prompt-for-offload fa fa-rotate-90 fa-upload"></div>'; - } else if (canDisconnect) { - return '<div title="Disconnect" class="pointer prompt-for-disconnect fa fa-power-off"></div>'; - } else if (isOffloaded) { - return '<div title="Connect" class="pointer prompt-for-connect fa fa-plug"></div>' + - '<div title="Delete" class="pointer prompt-for-removal fa fa-trash"></div>'; + markup += disconnectDiv; + } else if (dataContext.status === 'DISCONNECTED') { + markup += connectDiv + offloadDiv + deleteDiv; + } else if (dataContext.status === 'OFFLOADED') { + markup += connectDiv + deleteDiv; } else { - return '<div style="width: 16px; height: 16px;"> </div>'; + markup += '<div style="width: 16px; height: 16px;"> </div>'; } + + return markup; }; columnModel.push({ http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java index 98fa03a..65a7e72 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java @@ -57,7 +57,7 @@ public class DisconnectNode extends AbstractNiFiCommand<NodeResult> { NodeDTO nodeDto = new NodeDTO(); nodeDto.setNodeId(nodeId); - // TODO There's no constant for node status in + // TODO There are no constants for the DISCONNECT node status nodeDto.setStatus("DISCONNECTING"); NodeEntity nodeEntity = new NodeEntity(); nodeEntity.setNode(nodeDto); http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7d433b4..b86cada 100644 --- a/pom.xml +++ b/pom.xml @@ -342,6 +342,9 @@ <include>**/Test*.class</include> <include>**/*Spec.class</include> </includes> + <excludes> + <exclude>**/*ITSpec.class</exclude> + </excludes> <redirectTestOutputToFile>true</redirectTestOutputToFile> <argLine combine.children="append">-Xmx1G -Djava.net.preferIPv4Stack=true
