[ 
https://issues.apache.org/jira/browse/OOZIE-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shwetha G S updated OOZIE-1664:
-------------------------------

    Description: 
In CallableQueueService:
{noformat}
            queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 
30, TimeUnit.MILLISECONDS, queueSize) {

                @Override
                protected boolean eligibleToPoll(QueueElement<?> element) {
                    if (element != null) {
                        CallableWrapper wrapper = (CallableWrapper) element;
                        if (element.getElement() != null) {
                            return 
callableReachMaxConcurrency(wrapper.getElement());
                        }
                    }
                    return false;
                }

            };
{noformat}

elegibleToPoll() doesn't check if delay is -ve and in 
PollablePriorityQueue.poll() iterates over all elements and return any element 
whose concurrency is not maxed. The element returned can be an element with +ve 
delay which implies the element is still not up for execution(delay is not 
elapsed yet), but is picked up for execution
{noformat}
                    Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
                    while(e == null && iter.hasNext()) {
                        e = iter.next();
                        if (eligibleToPoll(e)) {
                            queues[i - 1].remove(e);
                        }
                        else {
                            debug("poll(): the iterator element [{0}], from 
P[{1}] is not eligible to poll", e.getElement().toString(), i);
                            e = null;
                        }
                    }
{noformat}

  was:
In CallableQueueService:
{noformat}
            queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 
30, TimeUnit.MILLISECONDS, queueSize) {

                @Override
                protected boolean eligibleToPoll(QueueElement<?> element) {
                    if (element != null) {
                        CallableWrapper wrapper = (CallableWrapper) element;
                        if (element.getElement() != null) {
                            return 
callableReachMaxConcurrency(wrapper.getElement());
                        }
                    }
                    return false;
                }

            };
{noformat}

elegibleToPoll() doesn't check if delay is -ve and in 
PollablePriorityQueue.poll() iterates over all elements and return any element 
whose concurrency is not maxed
{noformat}
                    Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
                    while(e == null && iter.hasNext()) {
                        e = iter.next();
                        if (eligibleToPoll(e)) {
                            queues[i - 1].remove(e);
                        }
                        else {
                            debug("poll(): the iterator element [{0}], from 
P[{1}] is not eligible to poll", e.getElement().toString(), i);
                            e = null;
                        }
                    }
{noformat}


> PollablePriorityDelayQueue.poll() returns elements with +ve delay
> -----------------------------------------------------------------
>
>                 Key: OOZIE-1664
>                 URL: https://issues.apache.org/jira/browse/OOZIE-1664
>             Project: Oozie
>          Issue Type: Bug
>            Reporter: Shwetha G S
>
> In CallableQueueService:
> {noformat}
>             queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 
> 30, TimeUnit.MILLISECONDS, queueSize) {
>                 @Override
>                 protected boolean eligibleToPoll(QueueElement<?> element) {
>                     if (element != null) {
>                         CallableWrapper wrapper = (CallableWrapper) element;
>                         if (element.getElement() != null) {
>                             return 
> callableReachMaxConcurrency(wrapper.getElement());
>                         }
>                     }
>                     return false;
>                 }
>             };
> {noformat}
> elegibleToPoll() doesn't check if delay is -ve and in 
> PollablePriorityQueue.poll() iterates over all elements and return any 
> element whose concurrency is not maxed. The element returned can be an 
> element with +ve delay which implies the element is still not up for 
> execution(delay is not elapsed yet), but is picked up for execution
> {noformat}
>                     Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
>                     while(e == null && iter.hasNext()) {
>                         e = iter.next();
>                         if (eligibleToPoll(e)) {
>                             queues[i - 1].remove(e);
>                         }
>                         else {
>                             debug("poll(): the iterator element [{0}], from 
> P[{1}] is not eligible to poll", e.getElement().toString(), i);
>                             e = null;
>                         }
>                     }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to