[
https://issues.apache.org/jira/browse/OOZIE-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13867006#comment-13867006
]
Rohini Palaniswamy commented on OOZIE-1664:
-------------------------------------------
I don't think this is an issue and the jira is invalid. Queueing elements with
delay in one of the basic stuff and if it was broken we should be seeing at
least some unit test failures.. QueueElement<E> implements Delayed and
java.util.concurrent.DelayQueue.poll();
{code}
e = queues[i - 1].peek();
if (eligibleToPoll(e)) {
e = queues[i - 1].poll();
}
{code}
Though peek() returns the first element in queue (return (E) queue[0];), the
poll() would return null and that is already handled in oozie code.
DelayQueue.java
{code}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
}
} finally {
lock.unlock();
}
}
{code}
> PollablePriorityDelayQueue.poll() returns elements with +ve delay
> -----------------------------------------------------------------
>
> Key: OOZIE-1664
> URL: https://issues.apache.org/jira/browse/OOZIE-1664
> Project: Oozie
> Issue Type: Bug
> Reporter: Shwetha G S
> Assignee: Shwetha G S
> Attachments: OOZIE-1664.patch
>
>
> In CallableQueueService:
> {noformat}
> queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 *
> 30, TimeUnit.MILLISECONDS, queueSize) {
> @Override
> protected boolean eligibleToPoll(QueueElement<?> element) {
> if (element != null) {
> CallableWrapper wrapper = (CallableWrapper) element;
> if (element.getElement() != null) {
> return
> callableReachMaxConcurrency(wrapper.getElement());
> }
> }
> return false;
> }
> };
> {noformat}
> elegibleToPoll() doesn't check if delay is -ve and in
> PollablePriorityQueue.poll() iterates over all elements and return any
> element whose concurrency is not maxed. The element returned can be an
> element with +ve delay which implies the element is still not up for
> execution(delay is not elapsed yet), but is picked up for execution
> {noformat}
> Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
> while(e == null && iter.hasNext()) {
> e = iter.next();
> if (eligibleToPoll(e)) {
> queues[i - 1].remove(e);
> }
> else {
> debug("poll(): the iterator element [{0}], from
> P[{1}] is not eligible to poll", e.getElement().toString(), i);
> e = null;
> }
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)