[
https://issues.apache.org/jira/browse/OOZIE-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13867536#comment-13867536
]
Shwetha G S commented on OOZIE-1664:
------------------------------------
This part of the code is fine. Its the else part which has the issue. If the
peek() element has reached max concurrency, poll() iterates through the queue
and return the element that doesn't violate max concurrency:
{noformat}
else {
if (e != null) {
debug("poll(): the peek element [{0}], from P[{1}] is
not eligible to poll", e.getElement().toString(), i);
}
e = null;
Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
while(e == null && iter.hasNext()) {
e = iter.next();
if (eligibleToPoll(e)) {
queues[i - 1].remove(e);
}
else {
debug("poll(): the iterator element [{0}], from
P[{1}] is not eligible to poll", e.getElement().toString(), i);
e = null;
}
}
}
{noformat}
It just iterates through queue and doesn't check the delay. So, its possible
that this code returns an element with +ve delay
> PollablePriorityDelayQueue.poll() returns elements with +ve delay
> -----------------------------------------------------------------
>
> Key: OOZIE-1664
> URL: https://issues.apache.org/jira/browse/OOZIE-1664
> Project: Oozie
> Issue Type: Bug
> Reporter: Shwetha G S
> Assignee: Shwetha G S
> Attachments: OOZIE-1664.patch
>
>
> In CallableQueueService:
> {noformat}
> queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 *
> 30, TimeUnit.MILLISECONDS, queueSize) {
> @Override
> protected boolean eligibleToPoll(QueueElement<?> element) {
> if (element != null) {
> CallableWrapper wrapper = (CallableWrapper) element;
> if (element.getElement() != null) {
> return
> callableReachMaxConcurrency(wrapper.getElement());
> }
> }
> return false;
> }
> };
> {noformat}
> elegibleToPoll() doesn't check if delay is -ve and in
> PollablePriorityQueue.poll() iterates over all elements and return any
> element whose concurrency is not maxed. The element returned can be an
> element with +ve delay which implies the element is still not up for
> execution(delay is not elapsed yet), but is picked up for execution
> {noformat}
> Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
> while(e == null && iter.hasNext()) {
> e = iter.next();
> if (eligibleToPoll(e)) {
> queues[i - 1].remove(e);
> }
> else {
> debug("poll(): the iterator element [{0}], from
> P[{1}] is not eligible to poll", e.getElement().toString(), i);
> e = null;
> }
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)