[ 
https://issues.apache.org/jira/browse/NIFI-6787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16956162#comment-16956162
 ] 

Tamas Palfy commented on NIFI-6787:
-----------------------------------

Hi [~markap14],
I think neither jira you mentioned covers this issue. NIFI-6517 looks similar 
but the root cause seems to be different.

I attached a template  [^template.change_hostname_in_groovy_script.xml] which 
is a simple GeneratFlowFile and an ExecuteScript with a groovy code that sleeps 
if the hostname matches a given string. (You need to change that given string 
to one of the hostnames in your cluster.)

I did a remote debug on a Cloudbreak cluster and saw that the connection was 
considered both empty and full at the same time.
These expressions all returned true:

{code:java}
# When checking at the GenerateFlowFile side
# (During ConnectableTask.invoke: 
# ...
# if 
(!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships))
 {
# ...
# )
new ArrayList<>(connectable.getConnections(new 
ArrayList<>(connectable.getRelationships()).get(0))).get(0).getFlowFileQueue()).isActiveQueueEmpty()

new ArrayList<>(connectable.getConnections(new 
ArrayList<>(connectable.getRelationships()).get(0))).get(0).getFlowFileQueue()).isFull()

# When checking at the ExecuteScript side
((SocketLoadBalancedFlowFileQueue)((StandardProcessorNode) 
connectable).getIncomingNonLoopConnections().get(0).getFlowFileQueue()).isActiveQueueEmpty()

((SocketLoadBalancedFlowFileQueue)((StandardProcessorNode) 
connectable).getIncomingNonLoopConnections().get(0).getFlowFileQueue()).isFull()
{code}

(If you want to verify this you probably want to increase the sleep time in the 
groovy script to hours as remote debugging can be very slow.)

> Load balanced connection can hold up for whole cluster when one node slows 
> down
> -------------------------------------------------------------------------------
>
>                 Key: NIFI-6787
>                 URL: https://issues.apache.org/jira/browse/NIFI-6787
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Tamas Palfy
>            Assignee: Tamas Palfy
>            Priority: Major
>         Attachments: template.change_hostname_in_groovy_script.xml
>
>
> A slow processor on one node in a cluster can slow down the same processor on 
> the other nodes when load balanced incoming connection is used:
> When a processors decides wether it can pass along a flowfile, it checks 
> whether the outgoing connection is full or not (if full, it yields).
> Similarly when the receiving processor is to be scheduled the framework 
> checks if the incoming connection is empty (if empty, no reason to call 
> {{onTrigger}}).
> The problem is that when checking if it's full or not, it checks the _total_ 
> size (which is across the whole cluster) and compares it to the max (which is 
> scoped to the current node).
> The empty check is (correctly) done on the local partition only.
> This can lead to the case where the slow node fills up its queue while the 
> faster ones empty theirs.
> Once the slow node has a full queue, the fast ones stop receiving new input 
> and thus stop working after their queues get emptied.
> The issue is probably the fact that 
> {{SocketLoadBalancedFlowFileQueue.isFull}} actually calls to 
> {{AbstractFlowFileQueue.isFull}} which checks {{size()}}, but that returns 
> the _total_ size. (The empty check looks fine but for reference it is done 
> via {{SocketLoadBalancedFlowFileQueue.isActiveQueueEmpty}}).
> {code:title=AbstractFlowFileQueue.java|borderStyle=solid}
> ...
>     private MaxQueueSize getMaxQueueSize() {
>         return maxQueueSize.get();
>     }
>     @Override
>     public boolean isFull() {
>         return isFull(size());
>     }
>     protected boolean isFull(final QueueSize queueSize) {
>         final MaxQueueSize maxSize = getMaxQueueSize();
>         // Check if max size is set
>         if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
>             return false;
>         }
>         if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= 
> maxSize.getMaxCount()) {
>             return true;
>         }
>         if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= 
> maxSize.getMaxBytes()) {
>             return true;
>         }
>         return false;
>     }
> ...
> {code}
> {code:title=SocketLoadBalancedFlowFileQueue.java|borderStyle=solid}
> ...
>     @Override
>     public QueueSize size() {
>         return totalSize.get();
>     }
> ...
>     @Override
>     public boolean isActiveQueueEmpty() {
>         return localPartition.isActiveQueueEmpty();
>     }
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to