This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f95044f NIFI-9433: When a Connection is unregistered from the
NioAsyncLoadBalanceClient, make sure that we only cancel its active transaction
if the transaction belongs to the appropriate connection. Also ensure that when
we do cancel a transaction / call its failure callback, we purge the collection
of any FlowFiles that have been sent in that transaction. This ensures that we
cannot later attempt to failure the transaction again, decrementing the count
of FlowFiles for the con [...]
f95044f is described below
commit f95044ff877004e4b6cd3c0d399525ae9a07cfb0
Author: Mark Payne <[email protected]>
AuthorDate: Thu Dec 2 11:21:36 2021 -0500
NIFI-9433: When a Connection is unregistered from the
NioAsyncLoadBalanceClient, make sure that we only cancel its active transaction
if the transaction belongs to the appropriate connection. Also ensure that when
we do cancel a transaction / call its failure callback, we purge the collection
of any FlowFiles that have been sent in that transaction. This ensures that we
cannot later attempt to failure the transaction again, decrementing the count
of FlowFiles for the connection more t [...]
Signed-off-by: Joe Gresock <[email protected]>
This closes #5564.
---
.../clustered/client/async/nio/LoadBalanceSession.java | 7 ++++---
.../client/async/nio/NioAsyncLoadBalanceClient.java | 13 +++++++------
.../clustered/client/async/nio/TestLoadBalanceSession.java | 4 ++--
3 files changed, 13 insertions(+), 11 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
index 011558b..4178e55 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
@@ -43,7 +43,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
@@ -119,8 +118,10 @@ public class LoadBalanceSession {
return phase.getRequiredSelectionKey();
}
- public synchronized List<FlowFileRecord> getFlowFilesSent() {
- return Collections.unmodifiableList(flowFilesSent);
+ public synchronized List<FlowFileRecord> getAndPurgeFlowFilesSent() {
+ final List<FlowFileRecord> copy = new ArrayList<>(flowFilesSent);
+ flowFilesSent.clear();
+ return copy;
}
public synchronized boolean isComplete() {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index f88e0ef..a322b24 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -129,11 +129,12 @@ public class NioAsyncLoadBalanceClient implements
AsyncLoadBalanceClient {
}
logger.debug("{} Unregistered Connection with ID {}. Will fail any
in-flight FlowFiles for Registered Partition {}", this, connectionId,
removedPartition);
- if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
+ final boolean validSession = loadBalanceSession != null &&
connectionId.equals(loadBalanceSession.getPartition().getConnectionId());
+ if (validSession && !loadBalanceSession.isComplete()) {
// Attempt to cancel the session. If successful, trigger the
failure callback for the partition.
// If not successful, it indicates that another thread has
completed the session and is responsible or the transaction success/failure
if (loadBalanceSession.cancel()) {
- final List<FlowFileRecord> flowFilesSent =
loadBalanceSession.getFlowFilesSent();
+ final List<FlowFileRecord> flowFilesSent =
loadBalanceSession.getAndPurgeFlowFilesSent();
logger.debug("{} Triggering failure callback for {} FlowFiles
for Registered Partition {} because partition was unregistered", this,
flowFilesSent.size(), removedPartition);
removedPartition.getFailureCallback().onTransactionFailed(flowFilesSent,
TransactionFailureCallback.TransactionPhase.SENDING);
@@ -268,7 +269,7 @@ public class NioAsyncLoadBalanceClient implements
AsyncLoadBalanceClient {
loadBalanceSession.getPartition().getConnectionId() +
" due to " + e);
penalize();
-
loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getFlowFilesSent(),
e, TransactionFailureCallback.TransactionPhase.SENDING);
+
loadBalanceSession.getPartition().getFailureCallback().onTransactionFailed(loadBalanceSession.getAndPurgeFlowFilesSent(),
e, TransactionFailureCallback.TransactionPhase.SENDING);
close();
return false;
@@ -278,7 +279,7 @@ public class NioAsyncLoadBalanceClient implements
AsyncLoadBalanceClient {
} while (success);
if (loadBalanceSession.isComplete() &&
!loadBalanceSession.isCanceled()) {
-
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent(),
nodeIdentifier);
+
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getAndPurgeFlowFilesSent(),
nodeIdentifier);
}
return anySuccess;
@@ -311,10 +312,10 @@ public class NioAsyncLoadBalanceClient implements
AsyncLoadBalanceClient {
loadBalanceSession = null;
logger.debug("Node {} disconnected so will terminate the Load
Balancing Session", nodeIdentifier);
- final List<FlowFileRecord> flowFilesSent =
session.getFlowFilesSent();
+ final List<FlowFileRecord> flowFilesSent =
session.getAndPurgeFlowFilesSent();
if (!flowFilesSent.isEmpty()) {
-
session.getPartition().getFailureCallback().onTransactionFailed(session.getFlowFilesSent(),
TransactionFailureCallback.TransactionPhase.SENDING);
+
session.getPartition().getFailureCallback().onTransactionFailed(flowFilesSent,
TransactionFailureCallback.TransactionPhase.SENDING);
}
close();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
index 2ae0f0b..43a3cfe 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
@@ -190,7 +190,7 @@ public class TestLoadBalanceSession {
assertArrayEquals(expectedSent, dataSent);
- assertEquals(Arrays.asList(flowFile1, flowFile2),
transaction.getFlowFilesSent());
+ assertEquals(Arrays.asList(flowFile1, flowFile2),
transaction.getAndPurgeFlowFilesSent());
}
@@ -271,6 +271,6 @@ public class TestLoadBalanceSession {
assertArrayEquals(expectedSent, dataSent);
- assertEquals(Arrays.asList(flowFile1), transaction.getFlowFilesSent());
+ assertEquals(Arrays.asList(flowFile1),
transaction.getAndPurgeFlowFilesSent());
}
}