This is an automated email from the ASF dual-hosted git repository.
thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e19940e NIFI-8466: Resolving offload bug with Single Node load
balanced queues
e19940e is described below
commit e19940ea7ecff9b88b1fc2ac1aa5c1c8b3c3a777
Author: Joe Gresock <[email protected]>
AuthorDate: Fri Apr 23 15:22:28 2021 -0400
NIFI-8466: Resolving offload bug with Single Node load balanced queues
Signed-off-by: Nathan Gough <[email protected]>
This closes #5025.
---
.../clustered/SocketLoadBalancedFlowFileQueue.java | 39 +++++++++++++---------
.../TestSocketLoadBalancedFlowFileQueue.java | 28 ++++++++++++++++
2 files changed, 51 insertions(+), 16 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 983fca2..0c6c1d8 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -1169,22 +1169,29 @@ public class SocketLoadBalancedFlowFileQueue extends
AbstractFlowFileQueue imple
partitionWriteLock.lock();
try {
if (!offloaded) {
- return;
- }
-
- switch (newState) {
- case CONNECTED:
- if (nodeId != null &&
nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
- // the node with this queue was connected to the
cluster, make sure the queue is not offloaded
- resetOffloadedQueue();
- }
- break;
- case OFFLOADED:
- case OFFLOADING:
- case DISCONNECTED:
- case DISCONNECTING:
- onNodeRemoved(nodeId);
- break;
+ switch (newState) {
+ case OFFLOADING:
+ onNodeRemoved(nodeId);
+ break;
+ case CONNECTED:
+ onNodeAdded(nodeId);
+ break;
+ }
+ } else {
+ switch (newState) {
+ case CONNECTED:
+ if (nodeId != null &&
nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
+ // the node with this queue was connected to
the cluster, make sure the queue is not offloaded
+ resetOffloadedQueue();
+ }
+ break;
+ case OFFLOADED:
+ case OFFLOADING:
+ case DISCONNECTED:
+ case DISCONNECTING:
+ onNodeRemoved(nodeId);
+ break;
+ }
}
} finally {
partitionWriteLock.unlock();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
index 8959959..0deb917 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.queue.clustered;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.MockFlowFileRecord;
@@ -428,6 +429,33 @@ public class TestSocketLoadBalancedFlowFileQueue {
}
}
+ @Test
+ public void testOffloadAndReconnectKeepsQueueInCorrectOrder() {
+ // Simulate FirstNodePartitioner, which always selects the first node
in the partition queue
+ queue.setFlowFilePartitioner(new StaticFlowFilePartitioner(0));
+
+ QueuePartition firstPartition = queue.putAndGetPartition(new
MockFlowFileRecord());
+
+ final NodeIdentifier node1Identifier = nodeIds.get(0);
+ final NodeIdentifier node2Identifier = nodeIds.get(1);
+
+ // The local node partition starts out first
+ Assert.assertEquals("local", firstPartition.getSwapPartitionName());
+
+ // Simulate offloading the first node
+ clusterTopologyEventListener.onNodeStateChange(node1Identifier,
NodeConnectionState.OFFLOADING);
+
+ // Now the remote partition for the second node should be returned
+ firstPartition = queue.putAndGetPartition(new MockFlowFileRecord());
+ Assert.assertEquals(node2Identifier,
firstPartition.getNodeIdentifier().get());
+
+ // Simulate reconnecting the first node
+ clusterTopologyEventListener.onNodeStateChange(node1Identifier,
NodeConnectionState.CONNECTED);
+
+ // Now the local node partition is returned again
+ firstPartition = queue.putAndGetPartition(new MockFlowFileRecord());
+ Assert.assertEquals("local", firstPartition.getSwapPartitionName());
+ }
@Test(timeout = 30000)
public void
testChangeInClusterTopologyTriggersRebalanceOnlyOnRemovedNodeIfNecessary()
throws InterruptedException {