This is an automated email from the ASF dual-hosted git repository.

thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e19940e  NIFI-8466: Resolving offload bug with Single Node load 
balanced queues
e19940e is described below

commit e19940ea7ecff9b88b1fc2ac1aa5c1c8b3c3a777
Author: Joe Gresock <[email protected]>
AuthorDate: Fri Apr 23 15:22:28 2021 -0400

    NIFI-8466: Resolving offload bug with Single Node load balanced queues
    
    Signed-off-by: Nathan Gough <[email protected]>
    
    This closes #5025.
---
 .../clustered/SocketLoadBalancedFlowFileQueue.java | 39 +++++++++++++---------
 .../TestSocketLoadBalancedFlowFileQueue.java       | 28 ++++++++++++++++
 2 files changed, 51 insertions(+), 16 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 983fca2..0c6c1d8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -1169,22 +1169,29 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
             partitionWriteLock.lock();
             try {
                 if (!offloaded) {
-                    return;
-                }
-
-                switch (newState) {
-                    case CONNECTED:
-                        if (nodeId != null && 
nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
-                            // the node with this queue was connected to the 
cluster, make sure the queue is not offloaded
-                            resetOffloadedQueue();
-                        }
-                        break;
-                    case OFFLOADED:
-                    case OFFLOADING:
-                    case DISCONNECTED:
-                    case DISCONNECTING:
-                        onNodeRemoved(nodeId);
-                        break;
+                    switch (newState) {
+                        case OFFLOADING:
+                            onNodeRemoved(nodeId);
+                            break;
+                        case CONNECTED:
+                            onNodeAdded(nodeId);
+                            break;
+                    }
+                } else {
+                    switch (newState) {
+                        case CONNECTED:
+                            if (nodeId != null && 
nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
+                                // the node with this queue was connected to 
the cluster, make sure the queue is not offloaded
+                                resetOffloadedQueue();
+                            }
+                            break;
+                        case OFFLOADED:
+                        case OFFLOADING:
+                        case DISCONNECTED:
+                        case DISCONNECTING:
+                            onNodeRemoved(nodeId);
+                            break;
+                    }
                 }
             } finally {
                 partitionWriteLock.unlock();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
index 8959959..0deb917 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.queue.clustered;
 
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.MockFlowFileRecord;
@@ -428,6 +429,33 @@ public class TestSocketLoadBalancedFlowFileQueue {
         }
     }
 
+    @Test
+    public void testOffloadAndReconnectKeepsQueueInCorrectOrder() {
+        // Simulate FirstNodePartitioner, which always selects the first node 
in the partition queue
+        queue.setFlowFilePartitioner(new StaticFlowFilePartitioner(0));
+
+        QueuePartition firstPartition = queue.putAndGetPartition(new 
MockFlowFileRecord());
+
+        final NodeIdentifier node1Identifier = nodeIds.get(0);
+        final NodeIdentifier node2Identifier = nodeIds.get(1);
+
+        // The local node partition starts out first
+        Assert.assertEquals("local", firstPartition.getSwapPartitionName());
+
+        // Simulate offloading the first node
+        clusterTopologyEventListener.onNodeStateChange(node1Identifier, 
NodeConnectionState.OFFLOADING);
+
+        // Now the remote partition for the second node should be returned
+        firstPartition = queue.putAndGetPartition(new MockFlowFileRecord());
+        Assert.assertEquals(node2Identifier, 
firstPartition.getNodeIdentifier().get());
+
+        // Simulate reconnecting the first node
+        clusterTopologyEventListener.onNodeStateChange(node1Identifier, 
NodeConnectionState.CONNECTED);
+
+        // Now the local node partition is returned again
+        firstPartition = queue.putAndGetPartition(new MockFlowFileRecord());
+        Assert.assertEquals("local", firstPartition.getSwapPartitionName());
+    }
 
     @Test(timeout = 30000)
     public void 
testChangeInClusterTopologyTriggersRebalanceOnlyOnRemovedNodeIfNecessary() 
throws InterruptedException {

Reply via email to