[ https://issues.apache.org/jira/browse/NIFI-6787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16956162#comment-16956162 ]
Tamas Palfy commented on NIFI-6787: ----------------------------------- Hi [~markap14], I think neither jira you mentioned covers this issue. NIFI-6517 looks similar but the root cause seems to be different. I attached a template [^template.change_hostname_in_groovy_script.xml] which is a simple GeneratFlowFile and an ExecuteScript with a groovy code that sleeps if the hostname matches a given string. (You need to change that given string to one of the hostnames in your cluster.) I did a remote debug on a Cloudbreak cluster and saw that the connection was considered both empty and full at the same time. These expressions all returned true: {code:java} # When checking at the GenerateFlowFile side # (During ConnectableTask.invoke: # ... # if (!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) { # ... # ) new ArrayList<>(connectable.getConnections(new ArrayList<>(connectable.getRelationships()).get(0))).get(0).getFlowFileQueue()).isActiveQueueEmpty() new ArrayList<>(connectable.getConnections(new ArrayList<>(connectable.getRelationships()).get(0))).get(0).getFlowFileQueue()).isFull() # When checking at the ExecuteScript side ((SocketLoadBalancedFlowFileQueue)((StandardProcessorNode) connectable).getIncomingNonLoopConnections().get(0).getFlowFileQueue()).isActiveQueueEmpty() ((SocketLoadBalancedFlowFileQueue)((StandardProcessorNode) connectable).getIncomingNonLoopConnections().get(0).getFlowFileQueue()).isFull() {code} (If you want to verify this you probably want to increase the sleep time in the groovy script to hours as remote debugging can be very slow.) > Load balanced connection can hold up for whole cluster when one node slows > down > ------------------------------------------------------------------------------- > > Key: NIFI-6787 > URL: https://issues.apache.org/jira/browse/NIFI-6787 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Tamas Palfy > Assignee: Tamas Palfy > Priority: Major > Attachments: template.change_hostname_in_groovy_script.xml > > > A slow processor on one node in a cluster can slow down the same processor on > the other nodes when load balanced incoming connection is used: > When a processors decides wether it can pass along a flowfile, it checks > whether the outgoing connection is full or not (if full, it yields). > Similarly when the receiving processor is to be scheduled the framework > checks if the incoming connection is empty (if empty, no reason to call > {{onTrigger}}). > The problem is that when checking if it's full or not, it checks the _total_ > size (which is across the whole cluster) and compares it to the max (which is > scoped to the current node). > The empty check is (correctly) done on the local partition only. > This can lead to the case where the slow node fills up its queue while the > faster ones empty theirs. > Once the slow node has a full queue, the fast ones stop receiving new input > and thus stop working after their queues get emptied. > The issue is probably the fact that > {{SocketLoadBalancedFlowFileQueue.isFull}} actually calls to > {{AbstractFlowFileQueue.isFull}} which checks {{size()}}, but that returns > the _total_ size. (The empty check looks fine but for reference it is done > via {{SocketLoadBalancedFlowFileQueue.isActiveQueueEmpty}}). > {code:title=AbstractFlowFileQueue.java|borderStyle=solid} > ... > private MaxQueueSize getMaxQueueSize() { > return maxQueueSize.get(); > } > @Override > public boolean isFull() { > return isFull(size()); > } > protected boolean isFull(final QueueSize queueSize) { > final MaxQueueSize maxSize = getMaxQueueSize(); > // Check if max size is set > if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) { > return false; > } > if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= > maxSize.getMaxCount()) { > return true; > } > if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= > maxSize.getMaxBytes()) { > return true; > } > return false; > } > ... > {code} > {code:title=SocketLoadBalancedFlowFileQueue.java|borderStyle=solid} > ... > @Override > public QueueSize size() { > return totalSize.get(); > } > ... > @Override > public boolean isActiveQueueEmpty() { > return localPartition.isActiveQueueEmpty(); > } > ... > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)