This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ab8b744 NIFI-8457: Fixed bug in load balanced connections that can
result in the node never completing OFFLOAD action. Also fixed issue in which
data destined for a disconnected/offloaded node was never rebalanced even for
partitioning strategies that call for rebalancing on failure
ab8b744 is described below
commit ab8b7444b5587adfc7548a1da99c57040dc418c0
Author: Mark Payne <[email protected]>
AuthorDate: Wed Apr 21 16:57:39 2021 -0400
NIFI-8457: Fixed bug in load balanced connections that can result in the
node never completing OFFLOAD action. Also fixed issue in which data destined
for a disconnected/offloaded node was never rebalanced even for partitioning
strategies that call for rebalancing on failure
Signed-off-by: Pierre Villard <[email protected]>
This closes #5019.
---
.../client/async/nio/LoadBalanceSession.java | 14 +++++
.../async/nio/NioAsyncLoadBalanceClient.java | 65 ++++++++++++++--------
2 files changed, 57 insertions(+), 22 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
index 0169d07..011558b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
@@ -76,6 +76,7 @@ public class LoadBalanceSession {
private final String peerDescription;
private final String connectionId;
private final TransactionThreshold transactionThreshold;
+ private volatile boolean canceled = false;
final VersionNegotiator negotiator = new StandardVersionNegotiator(1);
private int protocolVersion = 1;
@@ -170,6 +171,19 @@ public class LoadBalanceSession {
}
}
+ public synchronized boolean cancel() {
+ if (complete) {
+ return false;
+ }
+
+ complete = true;
+ canceled = true;
+ return true;
+ }
+
+ public boolean isCanceled() {
+ return canceled;
+ }
private boolean confirmTransactionComplete() throws IOException {
logger.debug("Confirming Transaction Complete for Peer {}",
peerDescription);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index f3c4df2..f88e0ef 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -121,7 +121,24 @@ public class NioAsyncLoadBalanceClient implements
AsyncLoadBalanceClient {
}
public synchronized void unregister(final String connectionId) {
- registeredPartitions.remove(connectionId);
+ final RegisteredPartition removedPartition =
registeredPartitions.remove(connectionId);
+
+ if (removedPartition == null) {
+ logger.debug("{} Unregistered Connection with ID {} but there were
no Registered Partitions", this, connectionId);
+ return;
+ }
+
+ logger.debug("{} Unregistered Connection with ID {}. Will fail any
in-flight FlowFiles for Registered Partition {}", this, connectionId,
removedPartition);
+ if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
+ // Attempt to cancel the session. If successful, trigger the
failure callback for the partition.
+ // If not successful, it indicates that another thread has
completed the session and is responsible or the transaction success/failure
+ if (loadBalanceSession.cancel()) {
+ final List<FlowFileRecord> flowFilesSent =
loadBalanceSession.getFlowFilesSent();
+
+ logger.debug("{} Triggering failure callback for {} FlowFiles
for Registered Partition {} because partition was unregistered", this,
flowFilesSent.size(), removedPartition);
+
removedPartition.getFailureCallback().onTransactionFailed(flowFilesSent,
TransactionFailureCallback.TransactionPhase.SENDING);
+ }
+ }
}
public synchronized int getRegisteredConnectionCount() {
@@ -260,7 +277,7 @@ public class NioAsyncLoadBalanceClient implements
AsyncLoadBalanceClient {
anySuccess = anySuccess || success;
} while (success);
- if (loadBalanceSession.isComplete()) {
+ if (loadBalanceSession.isComplete() &&
!loadBalanceSession.isCanceled()) {
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent(),
nodeIdentifier);
}
@@ -306,31 +323,35 @@ public class NioAsyncLoadBalanceClient implements
AsyncLoadBalanceClient {
}
// Obtain a partition that needs to be rebalanced on failure
- final RegisteredPartition readyPartition =
getReadyPartition(partition ->
partition.getFailureCallback().isRebalanceOnFailure());
+ final RegisteredPartition readyPartition =
getReadyPartition(false, partition ->
partition.getFailureCallback().isRebalanceOnFailure());
if (readyPartition == null) {
return;
}
partitionQueue.offer(readyPartition); // allow partition to be
obtained again
- final TransactionThreshold threshold = newTransactionThreshold();
+ failFlowFiles(readyPartition);
+ penalize(); // Don't just transfer FlowFiles out of queue's
partition as fast as possible, because the node may only be disconnected for a
short time.
+ } finally {
+ loadBalanceSessionLock.unlock();
+ }
+ }
- final List<FlowFileRecord> flowFiles = new ArrayList<>();
- while (!threshold.isThresholdMet()) {
- final FlowFileRecord flowFile =
readyPartition.getFlowFileRecordSupplier().get();
- if (flowFile == null) {
- break;
- }
+ private void failFlowFiles(final RegisteredPartition partition) {
+ final TransactionThreshold threshold = newTransactionThreshold();
- flowFiles.add(flowFile);
- threshold.adjust(1, flowFile.getSize());
+ final List<FlowFileRecord> flowFiles = new ArrayList<>();
+ while (!threshold.isThresholdMet()) {
+ final FlowFileRecord flowFile =
partition.getFlowFileRecordSupplier().get();
+ if (flowFile == null) {
+ break;
}
- logger.debug("Node {} not connected so failing {} FlowFiles for
Load Balancing", nodeIdentifier, flowFiles.size());
- readyPartition.getFailureCallback().onTransactionFailed(flowFiles,
TransactionFailureCallback.TransactionPhase.SENDING);
- penalize(); // Don't just transfer FlowFiles out of queue's
partition as fast as possible, because the node may only be disconnected for a
short time.
- } finally {
- loadBalanceSessionLock.unlock();
+ flowFiles.add(flowFile);
+ threshold.adjust(1, flowFile.getSize());
}
+
+ logger.debug("Node {} not connected so failing {} FlowFiles for Load
Balancing", nodeIdentifier, flowFiles.size());
+ partition.getFailureCallback().onTransactionFailed(flowFiles,
TransactionFailureCallback.TransactionPhase.SENDING);
}
private synchronized LoadBalanceSession getFailoverSession() {
@@ -343,16 +364,16 @@ public class NioAsyncLoadBalanceClient implements
AsyncLoadBalanceClient {
private RegisteredPartition getReadyPartition() {
- return getReadyPartition(partition -> true);
+ return getReadyPartition(true, partition -> true);
}
- private synchronized RegisteredPartition getReadyPartition(final
Predicate<RegisteredPartition> filter) {
+ private synchronized RegisteredPartition getReadyPartition(final boolean
requireNodeConnected, final Predicate<RegisteredPartition> filter) {
final List<RegisteredPartition> polledPartitions = new ArrayList<>();
try {
RegisteredPartition partition;
while ((partition = partitionQueue.poll()) != null) {
- if (partition.isEmpty() || partition.isPenalized() ||
!checkNodeConnected() || !filter.test(partition)) {
+ if (partition.isEmpty() || partition.isPenalized() ||
(requireNodeConnected && !checkNodeConnected(partition)) ||
!filter.test(partition)) {
polledPartitions.add(partition);
continue;
}
@@ -366,14 +387,14 @@ public class NioAsyncLoadBalanceClient implements
AsyncLoadBalanceClient {
}
}
- private synchronized boolean checkNodeConnected() {
+ private synchronized boolean checkNodeConnected(final RegisteredPartition
partition) {
final NodeConnectionStatus status =
clusterCoordinator.getConnectionStatus(nodeIdentifier);
final boolean connected = status != null && status.getState() ==
NodeConnectionState.CONNECTED;
// If not connected but the last known state is connected, we know
that the node has just transitioned to disconnected.
// In this case we need to call #nodeDisconnected in order to allow
for failover to take place
if (!connected) {
- nodeDisconnected();
+ failFlowFiles(partition);
}
return connected;