[ 
https://issues.apache.org/jira/browse/OOZIE-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13867006#comment-13867006
 ] 

Rohini Palaniswamy commented on OOZIE-1664:
-------------------------------------------

I don't think this is an issue and the jira is invalid. Queueing elements with 
delay in one of the basic stuff and if it was broken we should be seeing at 
least some unit test failures.. QueueElement<E> implements Delayed and 
java.util.concurrent.DelayQueue.poll();

{code}
 e = queues[i - 1].peek();
                if (eligibleToPoll(e)) {
                    e = queues[i - 1].poll();
                }
{code}
  Though peek() returns the first element in queue (return (E) queue[0];), the 
poll() would return null and that is already handled in oozie code.

DelayQueue.java
{code}
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;
            else {
                E x = q.poll();
                assert x != null;
                if (q.size() != 0)
                    available.signalAll();
                return x;
            }
        } finally {
            lock.unlock();
        }
    }
{code}

> PollablePriorityDelayQueue.poll() returns elements with +ve delay
> -----------------------------------------------------------------
>
>                 Key: OOZIE-1664
>                 URL: https://issues.apache.org/jira/browse/OOZIE-1664
>             Project: Oozie
>          Issue Type: Bug
>            Reporter: Shwetha G S
>            Assignee: Shwetha G S
>         Attachments: OOZIE-1664.patch
>
>
> In CallableQueueService:
> {noformat}
>             queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 
> 30, TimeUnit.MILLISECONDS, queueSize) {
>                 @Override
>                 protected boolean eligibleToPoll(QueueElement<?> element) {
>                     if (element != null) {
>                         CallableWrapper wrapper = (CallableWrapper) element;
>                         if (element.getElement() != null) {
>                             return 
> callableReachMaxConcurrency(wrapper.getElement());
>                         }
>                     }
>                     return false;
>                 }
>             };
> {noformat}
> elegibleToPoll() doesn't check if delay is -ve and in 
> PollablePriorityQueue.poll() iterates over all elements and return any 
> element whose concurrency is not maxed. The element returned can be an 
> element with +ve delay which implies the element is still not up for 
> execution(delay is not elapsed yet), but is picked up for execution
> {noformat}
>                     Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
>                     while(e == null && iter.hasNext()) {
>                         e = iter.next();
>                         if (eligibleToPoll(e)) {
>                             queues[i - 1].remove(e);
>                         }
>                         else {
>                             debug("poll(): the iterator element [{0}], from 
> P[{1}] is not eligible to poll", e.getElement().toString(), i);
>                             e = null;
>                         }
>                     }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to