This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.11.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit c40a8d4e5db407d72622dd94e33527d3441ecfae Author: Mark Payne <[email protected]> AuthorDate: Wed Jan 22 12:05:03 2020 -0500 NIFI-7059: This closes #4007. Fixed bug that results in priorities not properly being set in the SocketLoadBalancedFlowFileQueue. Even though the queue's setPriorities method was called, the underlying may not have contained the localPartition. As a result, when setPriorities() was called, it did not properly delegate that call to . As a result, the queue knew that the Prioritizers were set but the local queue did not apply them. This happened due to a race condition between queue cr [...] Signed-off-by: Joe Witt <[email protected]> --- .../coordination/node/NodeClusterCoordinator.java | 2 + .../controller/queue/AbstractFlowFileQueue.java | 56 ++++++++--- .../nifi/controller/queue/QueuePrioritizer.java | 9 +- .../clustered/SocketLoadBalancedFlowFileQueue.java | 22 +++-- .../TestSocketLoadBalancedFlowFileQueue.java | 104 ++++++++++++++++++--- 5 files changed, 157 insertions(+), 36 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 888f970..e428046 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -259,6 +259,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl this.nodeId = nodeId; nodeStatuses.computeIfAbsent(nodeId.getId(), id -> new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)); eventListeners.forEach(listener -> listener.onLocalNodeIdentifierSet(nodeId)); + + storeState(); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java index 8b29613..09dc670 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java @@ -44,6 +44,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class AbstractFlowFileQueue implements FlowFileQueue { private static final Logger logger = LoggerFactory.getLogger(AbstractFlowFileQueue.class); @@ -62,6 +65,10 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue { private LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.DO_NOT_LOAD_BALANCE; private String partitioningAttribute = null; + private final ReadWriteLock loadBalanceRWLock = new ReentrantReadWriteLock(); + private final Lock loadBalanceReadLock = loadBalanceRWLock.readLock(); + private final Lock loadBalanceWriteLock = loadBalanceRWLock.writeLock(); + private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS; @@ -423,32 +430,57 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue { } @Override - public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) { - if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) { - throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute"); - } + public void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) { + loadBalanceWriteLock.lock(); + try { + if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) { + throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute"); + } - this.loadBalanceStrategy = strategy; - this.partitioningAttribute = partitioningAttribute; + this.loadBalanceStrategy = strategy; + this.partitioningAttribute = partitioningAttribute; + } finally { + loadBalanceWriteLock.unlock(); + } } @Override - public synchronized String getPartitioningAttribute() { - return partitioningAttribute; + public String getPartitioningAttribute() { + loadBalanceReadLock.lock(); + try { + return partitioningAttribute; + } finally { + loadBalanceReadLock.unlock(); + } } @Override - public synchronized LoadBalanceStrategy getLoadBalanceStrategy() { - return loadBalanceStrategy; + public LoadBalanceStrategy getLoadBalanceStrategy() { + loadBalanceReadLock.lock(); + try { + return loadBalanceStrategy; + } finally { + loadBalanceReadLock.unlock(); + } } @Override public synchronized void setLoadBalanceCompression(final LoadBalanceCompression compression) { - this.compression = compression; + loadBalanceWriteLock.lock(); + try { + this.compression = compression; + } finally { + loadBalanceWriteLock.unlock(); + } } @Override public synchronized LoadBalanceCompression getLoadBalanceCompression() { - return compression; + loadBalanceReadLock.lock(); + try { + return compression; + } finally { + loadBalanceReadLock.unlock(); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java index b78ccff..7c2c1a8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java @@ -17,15 +17,15 @@ package org.apache.nifi.controller.queue; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.flowfile.FlowFilePrioritizer; + import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.flowfile.FlowFilePrioritizer; - public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializable { private static final long serialVersionUID = 1L; private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>(); @@ -68,6 +68,7 @@ public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializabl final ContentClaim claim1 = f1.getContentClaim(); final ContentClaim claim2 = f2.getContentClaim(); + // put the one without a claim first if (claim1 == null && claim2 != null) { return -1; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 23a3788..837a3f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -145,19 +145,25 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress)); if (sortedNodeIdentifiers.isEmpty()) { + // No Node Identifiers are known yet. Just create the partitions using the local partition. queuePartitions = new QueuePartition[] { localPartition }; } else { - queuePartitions = new QueuePartition[sortedNodeIdentifiers.size()]; - - for (int i = 0; i < sortedNodeIdentifiers.size(); i++) { - final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i); - if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) { - queuePartitions[i] = localPartition; - } else { - queuePartitions[i] = createRemotePartition(nodeId); + // The node identifiers are known. Create the partitions using the local partition and 1 Remote Partition for each node + // that is not the local node identifier. If the Local Node Identifier is not yet known, that's okay. When it becomes known, + // the queuePartitions array will be recreated with the appropriate partitions. + final List<QueuePartition> partitionList = new ArrayList<>(); + partitionList.add(localPartition); + + final NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier(); + for (final NodeIdentifier nodeId : sortedNodeIdentifiers) { + if (nodeId.equals(localNodeId)) { + continue; } + + partitionList.add(createRemotePartition(nodeId)); } + queuePartitions = partitionList.toArray(new QueuePartition[0]); } partitioner = new LocalPartitionPartitioner(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java index e7d521c..351429d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java @@ -37,6 +37,8 @@ import org.apache.nifi.controller.repository.SwapSummary; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.junit.Assert; import org.junit.Before; @@ -136,6 +138,78 @@ public class TestSocketLoadBalancedFlowFileQueue { "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet()); } + + @Test + public void testPriorities() { + final FlowFilePrioritizer iValuePrioritizer = new FlowFilePrioritizer() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + final int i1 = Integer.parseInt(o1.getAttribute("i")); + final int i2 = Integer.parseInt(o2.getAttribute("i")); + return Integer.compare(i1, i2); + } + }; + + queue.setPriorities(Collections.singletonList(iValuePrioritizer)); + + final Map<String, String> attributes = new HashMap<>(); + + // Add 100 FlowFiles, each with a descending 'i' value (first has i=99, second has i=98, etc.) + for (int i = 99; i >= 0; i--) { + attributes.put("i", String.valueOf(i)); + final MockFlowFileRecord flowFile = new MockFlowFileRecord(new HashMap<>(attributes), 0L); + queue.put(flowFile); + } + + for (int i=0; i < 100; i++) { + final FlowFileRecord polled = queue.poll(Collections.emptySet()); + assertNotNull(polled); + assertEquals(String.valueOf(i), polled.getAttribute("i")); + } + + assertNull(queue.poll(Collections.emptySet())); + } + + @Test + public void testPrioritiesWhenSetBeforeLocalNodeIdDetermined() { + final FlowFilePrioritizer iValuePrioritizer = new FlowFilePrioritizer() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + final int i1 = Integer.parseInt(o1.getAttribute("i")); + final int i2 = Integer.parseInt(o2.getAttribute("i")); + return Integer.compare(i1, i2); + } + }; + + final ProcessScheduler scheduler = mock(ProcessScheduler.class); + final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class); + when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null); + + queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo, + contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter); + queue.setPriorities(Collections.singletonList(iValuePrioritizer)); + + when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null); + queue.setNodeIdentifiers(new HashSet<>(nodeIds), true); + + final Map<String, String> attributes = new HashMap<>(); + + // Add 100 FlowFiles, each with a descending 'i' value (first has i=99, second has i=98, etc.) + for (int i = 99; i >= 0; i--) { + attributes.put("i", String.valueOf(i)); + final MockFlowFileRecord flowFile = new MockFlowFileRecord(new HashMap<>(attributes), 0L); + queue.put(flowFile); + } + + for (int i=0; i < 100; i++) { + final FlowFileRecord polled = queue.poll(Collections.emptySet()); + assertNotNull(polled); + assertEquals(String.valueOf(i), polled.getAttribute("i")); + } + + assertNull(queue.poll(Collections.emptySet())); + } + @Test public void testBinsAccordingToPartitioner() { final FlowFilePartitioner partitioner = new StaticFlowFilePartitioner(1); @@ -395,8 +469,8 @@ public class TestSocketLoadBalancedFlowFileQueue { assertPartitionSizes(expectedPartitionSizes); } - @Test(timeout = 100000) - public void testLocalNodeIdentifierSet() throws InterruptedException { + @Test(timeout = 10000) + public void testDataInRemotePartitionForLocalIdIsMovedToLocalPartition() throws InterruptedException { nodeIds.clear(); final NodeIdentifier id1 = createNodeIdentifier(); @@ -410,7 +484,7 @@ public class TestSocketLoadBalancedFlowFileQueue { final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class); queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), mock(ProcessScheduler.class), flowFileRepo, provRepo, - contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter); + contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter); queue.setFlowFilePartitioner(new RoundRobinPartitioner()); @@ -421,22 +495,28 @@ public class TestSocketLoadBalancedFlowFileQueue { queue.put(new MockFlowFileRecord(attributes, 0)); } - for (int i=0; i < 3; i++) { - assertEquals(2, queue.getPartition(i).size().getObjectCount()); - } - - assertEquals(0, queue.getLocalPartition().size().getObjectCount()); - when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(id1); clusterTopologyEventListener.onLocalNodeIdentifierSet(id1); - assertPartitionSizes(new int[] {2, 2, 2}); + assertEquals(6, queue.size().getObjectCount()); - while (queue.getLocalPartition().size().getObjectCount() != 2) { - Thread.sleep(10L); + // Ensure that the partitions' object sizes add up to 6. This could take a short time because rebalancing will occur. + // So we wait in a loop. + while (true) { + int totalObjectCount = 0; + for (int i = 0; i < queue.getPartitionCount(); i++) { + totalObjectCount += queue.getPartition(i).size().getObjectCount(); + } + + if (totalObjectCount == 6) { + break; + } } + + assertEquals(3, queue.getPartitionCount()); } + private void assertPartitionSizes(final int[] expectedSizes) { final int[] partitionSizes = new int[queue.getPartitionCount()]; while (!Arrays.equals(expectedSizes, partitionSizes)) {
