This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f95044f  NIFI-9433: When a Connection is unregistered from the 
NioAsyncLoadBalanceClient, make sure that we only cancel its active transaction 
if the transaction belongs to the appropriate connection. Also ensure that when 
we do cancel a transaction / call its failure callback, we purge the collection 
of any FlowFiles that have been sent in that transaction. This ensures that we 
cannot later attempt to failure the transaction again, decrementing the count 
of FlowFiles for the con [...]
f95044f is described below

commit f95044ff877004e4b6cd3c0d399525ae9a07cfb0
Author: Mark Payne <[email protected]>
AuthorDate: Thu Dec 2 11:21:36 2021 -0500

    NIFI-9433: When a Connection is unregistered from the 
NioAsyncLoadBalanceClient, make sure that we only cancel its active transaction 
if the transaction belongs to the appropriate connection. Also ensure that when 
we do cancel a transaction / call its failure callback, we purge the collection 
of any FlowFiles that have been sent in that transaction. This ensures that we 
cannot later attempt to failure the transaction again, decrementing the count 
of FlowFiles for the connection more t [...]
    
    Signed-off-by: Joe Gresock <[email protected]>
    
    This closes #5564.
---
 .../clustered/client/async/nio/LoadBalanceSession.java      |  7 ++++---
 .../client/async/nio/NioAsyncLoadBalanceClient.java         | 13 +++++++------
 .../clustered/client/async/nio/TestLoadBalanceSession.java  |  4 ++--
 3 files changed, 13 insertions(+), 11 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
index 011558b..4178e55 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
@@ -43,7 +43,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.OptionalInt;
 import java.util.concurrent.TimeUnit;
@@ -119,8 +118,10 @@ public class LoadBalanceSession {
         return phase.getRequiredSelectionKey();
     }
 
-    public synchronized List<FlowFileRecord> getFlowFilesSent() {
-        return Collections.unmodifiableList(flowFilesSent);
+    public synchronized List<FlowFileRecord> getAndPurgeFlowFilesSent() {
+        final List<FlowFileRecord> copy = new ArrayList<>(flowFilesSent);
+        flowFilesSent.clear();
+        return copy;
     }
 
     public synchronized boolean isComplete() {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index f88e0ef..a322b24 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -129,11 +129,12 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
         }
 
         logger.debug("{} Unregistered Connection with ID {}. Will fail any 
in-flight FlowFiles for Registered Partition {}", this, connectionId, 
removedPartition);
-        if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
+        final boolean validSession = loadBalanceSession != null && 
connectionId.equals(loadBalanceSession.getPartition().getConnectionId());
+        if (validSession && !loadBalanceSession.isComplete()) {
             // Attempt to cancel the session. If successful, trigger the 
failure callback for the partition.
             // If not successful, it indicates that another thread has 
completed the session and is responsible or the transaction success/failure
             if (loadBalanceSession.cancel()) {
-                final List<FlowFileRecord> flowFilesSent = 
loadBalanceSession.getFlowFilesSent();
+                final List<FlowFileRecord> flowFilesSent = 
loadBalanceSession.getAndPurgeFlowFilesSent();
 
                 logger.debug("{} Triggering failure callback for {} FlowFiles 
for Registered Partition {} because partition was unregistered", this, 
flowFilesSent.size(), removedPartition);
                 
removedPartition.getFailureCallback().onTransactionFailed(flowFilesSent, 
TransactionFailureCallback.TransactionPhase.SENDING);
@@ -268,7 +269,7 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
                         loadBalanceSession.getPartition().getConnectionId() + 
" due to " + e);
 
                     penalize();
-                    
loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getFlowFilesSent(),
 e, TransactionFailureCallback.TransactionPhase.SENDING);
+                    
loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getAndPurgeFlowFilesSent(),
 e, TransactionFailureCallback.TransactionPhase.SENDING);
                     close();
 
                     return false;
@@ -278,7 +279,7 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
             } while (success);
 
             if (loadBalanceSession.isComplete() && 
!loadBalanceSession.isCanceled()) {
-                
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent(),
 nodeIdentifier);
+                
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getAndPurgeFlowFilesSent(),
 nodeIdentifier);
             }
 
             return anySuccess;
@@ -311,10 +312,10 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
                 loadBalanceSession = null;
 
                 logger.debug("Node {} disconnected so will terminate the Load 
Balancing Session", nodeIdentifier);
-                final List<FlowFileRecord> flowFilesSent = 
session.getFlowFilesSent();
+                final List<FlowFileRecord> flowFilesSent = 
session.getAndPurgeFlowFilesSent();
 
                 if (!flowFilesSent.isEmpty()) {
-                    
session.getPartition().getFailureCallback().onTransactionFailed(session.getFlowFilesSent(),
 TransactionFailureCallback.TransactionPhase.SENDING);
+                    
session.getPartition().getFailureCallback().onTransactionFailed(flowFilesSent, 
TransactionFailureCallback.TransactionPhase.SENDING);
                 }
 
                 close();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
index 2ae0f0b..43a3cfe 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
@@ -190,7 +190,7 @@ public class TestLoadBalanceSession {
 
         assertArrayEquals(expectedSent, dataSent);
 
-        assertEquals(Arrays.asList(flowFile1, flowFile2), 
transaction.getFlowFilesSent());
+        assertEquals(Arrays.asList(flowFile1, flowFile2), 
transaction.getAndPurgeFlowFilesSent());
     }
 
 
@@ -271,6 +271,6 @@ public class TestLoadBalanceSession {
 
         assertArrayEquals(expectedSent, dataSent);
 
-        assertEquals(Arrays.asList(flowFile1), transaction.getFlowFilesSent());
+        assertEquals(Arrays.asList(flowFile1), 
transaction.getAndPurgeFlowFilesSent());
     }
 }

Reply via email to