This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ab8b744  NIFI-8457: Fixed bug in load balanced connections that can 
result in the node never completing OFFLOAD action. Also fixed issue in which 
data destined for a disconnected/offloaded node was never rebalanced even for 
partitioning strategies that call for rebalancing on failure
ab8b744 is described below

commit ab8b7444b5587adfc7548a1da99c57040dc418c0
Author: Mark Payne <[email protected]>
AuthorDate: Wed Apr 21 16:57:39 2021 -0400

    NIFI-8457: Fixed bug in load balanced connections that can result in the 
node never completing OFFLOAD action. Also fixed issue in which data destined 
for a disconnected/offloaded node was never rebalanced even for partitioning 
strategies that call for rebalancing on failure
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #5019.
---
 .../client/async/nio/LoadBalanceSession.java       | 14 +++++
 .../async/nio/NioAsyncLoadBalanceClient.java       | 65 ++++++++++++++--------
 2 files changed, 57 insertions(+), 22 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
index 0169d07..011558b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
@@ -76,6 +76,7 @@ public class LoadBalanceSession {
     private final String peerDescription;
     private final String connectionId;
     private final TransactionThreshold transactionThreshold;
+    private volatile boolean canceled = false;
 
     final VersionNegotiator negotiator = new StandardVersionNegotiator(1);
     private int protocolVersion = 1;
@@ -170,6 +171,19 @@ public class LoadBalanceSession {
         }
     }
 
+    public synchronized boolean cancel() {
+        if (complete) {
+            return false;
+        }
+
+        complete = true;
+        canceled = true;
+        return true;
+    }
+
+    public boolean isCanceled() {
+        return canceled;
+    }
 
     private boolean confirmTransactionComplete() throws IOException {
         logger.debug("Confirming Transaction Complete for Peer {}", 
peerDescription);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index f3c4df2..f88e0ef 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -121,7 +121,24 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
     }
 
     public synchronized void unregister(final String connectionId) {
-        registeredPartitions.remove(connectionId);
+        final RegisteredPartition removedPartition = 
registeredPartitions.remove(connectionId);
+
+        if (removedPartition == null) {
+            logger.debug("{} Unregistered Connection with ID {} but there were 
no Registered Partitions", this, connectionId);
+            return;
+        }
+
+        logger.debug("{} Unregistered Connection with ID {}. Will fail any 
in-flight FlowFiles for Registered Partition {}", this, connectionId, 
removedPartition);
+        if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
+            // Attempt to cancel the session. If successful, trigger the 
failure callback for the partition.
+            // If not successful, it indicates that another thread has 
completed the session and is responsible or the transaction success/failure
+            if (loadBalanceSession.cancel()) {
+                final List<FlowFileRecord> flowFilesSent = 
loadBalanceSession.getFlowFilesSent();
+
+                logger.debug("{} Triggering failure callback for {} FlowFiles 
for Registered Partition {} because partition was unregistered", this, 
flowFilesSent.size(), removedPartition);
+                
removedPartition.getFailureCallback().onTransactionFailed(flowFilesSent, 
TransactionFailureCallback.TransactionPhase.SENDING);
+            }
+        }
     }
 
     public synchronized int getRegisteredConnectionCount() {
@@ -260,7 +277,7 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
                 anySuccess = anySuccess || success;
             } while (success);
 
-            if (loadBalanceSession.isComplete()) {
+            if (loadBalanceSession.isComplete() && 
!loadBalanceSession.isCanceled()) {
                 
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent(),
 nodeIdentifier);
             }
 
@@ -306,31 +323,35 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
             }
 
             // Obtain a partition that needs to be rebalanced on failure
-            final RegisteredPartition readyPartition = 
getReadyPartition(partition -> 
partition.getFailureCallback().isRebalanceOnFailure());
+            final RegisteredPartition readyPartition = 
getReadyPartition(false, partition -> 
partition.getFailureCallback().isRebalanceOnFailure());
             if (readyPartition == null) {
                 return;
             }
 
             partitionQueue.offer(readyPartition); // allow partition to be 
obtained again
-            final TransactionThreshold threshold = newTransactionThreshold();
+            failFlowFiles(readyPartition);
+            penalize(); // Don't just transfer FlowFiles out of queue's 
partition as fast as possible, because the node may only be disconnected for a 
short time.
+        } finally {
+            loadBalanceSessionLock.unlock();
+        }
+    }
 
-            final List<FlowFileRecord> flowFiles = new ArrayList<>();
-            while (!threshold.isThresholdMet()) {
-                final FlowFileRecord flowFile = 
readyPartition.getFlowFileRecordSupplier().get();
-                if (flowFile == null) {
-                    break;
-                }
+    private void failFlowFiles(final RegisteredPartition partition) {
+        final TransactionThreshold threshold = newTransactionThreshold();
 
-                flowFiles.add(flowFile);
-                threshold.adjust(1, flowFile.getSize());
+        final List<FlowFileRecord> flowFiles = new ArrayList<>();
+        while (!threshold.isThresholdMet()) {
+            final FlowFileRecord flowFile = 
partition.getFlowFileRecordSupplier().get();
+            if (flowFile == null) {
+                break;
             }
 
-            logger.debug("Node {} not connected so failing {} FlowFiles for 
Load Balancing", nodeIdentifier, flowFiles.size());
-            readyPartition.getFailureCallback().onTransactionFailed(flowFiles, 
TransactionFailureCallback.TransactionPhase.SENDING);
-            penalize(); // Don't just transfer FlowFiles out of queue's 
partition as fast as possible, because the node may only be disconnected for a 
short time.
-        } finally {
-            loadBalanceSessionLock.unlock();
+            flowFiles.add(flowFile);
+            threshold.adjust(1, flowFile.getSize());
         }
+
+        logger.debug("Node {} not connected so failing {} FlowFiles for Load 
Balancing", nodeIdentifier, flowFiles.size());
+        partition.getFailureCallback().onTransactionFailed(flowFiles, 
TransactionFailureCallback.TransactionPhase.SENDING);
     }
 
     private synchronized LoadBalanceSession getFailoverSession() {
@@ -343,16 +364,16 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
 
 
     private RegisteredPartition getReadyPartition() {
-        return getReadyPartition(partition -> true);
+        return getReadyPartition(true, partition -> true);
     }
 
-    private synchronized RegisteredPartition getReadyPartition(final 
Predicate<RegisteredPartition> filter) {
+    private synchronized RegisteredPartition getReadyPartition(final boolean 
requireNodeConnected, final Predicate<RegisteredPartition> filter) {
         final List<RegisteredPartition> polledPartitions = new ArrayList<>();
 
         try {
             RegisteredPartition partition;
             while ((partition = partitionQueue.poll()) != null) {
-                if (partition.isEmpty() || partition.isPenalized() || 
!checkNodeConnected() || !filter.test(partition)) {
+                if (partition.isEmpty() || partition.isPenalized() || 
(requireNodeConnected && !checkNodeConnected(partition)) || 
!filter.test(partition)) {
                     polledPartitions.add(partition);
                     continue;
                 }
@@ -366,14 +387,14 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
         }
     }
 
-    private synchronized boolean checkNodeConnected() {
+    private synchronized boolean checkNodeConnected(final RegisteredPartition 
partition) {
         final NodeConnectionStatus status = 
clusterCoordinator.getConnectionStatus(nodeIdentifier);
         final boolean connected = status != null && status.getState() == 
NodeConnectionState.CONNECTED;
 
         // If not connected but the last known state is connected, we know 
that the node has just transitioned to disconnected.
         // In this case we need to call #nodeDisconnected in order to allow 
for failover to take place
         if (!connected) {
-            nodeDisconnected();
+            failFlowFiles(partition);
         }
 
         return connected;

Reply via email to