[
https://issues.apache.org/jira/browse/OOZIE-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shwetha G S updated OOZIE-1664:
-------------------------------
Description:
In CallableQueueService:
{noformat}
queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 *
30, TimeUnit.MILLISECONDS, queueSize) {
@Override
protected boolean eligibleToPoll(QueueElement<?> element) {
if (element != null) {
CallableWrapper wrapper = (CallableWrapper) element;
if (element.getElement() != null) {
return
callableReachMaxConcurrency(wrapper.getElement());
}
}
return false;
}
};
{noformat}
elegibleToPoll() doesn't check if delay is -ve and in
PollablePriorityQueue.poll() iterates over all elements and return any element
whose concurrency is not maxed. The element returned can be an element with +ve
delay which implies the element is still not up for execution(delay is not
elapsed yet), but is picked up for execution
{noformat}
Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
while(e == null && iter.hasNext()) {
e = iter.next();
if (eligibleToPoll(e)) {
queues[i - 1].remove(e);
}
else {
debug("poll(): the iterator element [{0}], from
P[{1}] is not eligible to poll", e.getElement().toString(), i);
e = null;
}
}
{noformat}
was:
In CallableQueueService:
{noformat}
queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 *
30, TimeUnit.MILLISECONDS, queueSize) {
@Override
protected boolean eligibleToPoll(QueueElement<?> element) {
if (element != null) {
CallableWrapper wrapper = (CallableWrapper) element;
if (element.getElement() != null) {
return
callableReachMaxConcurrency(wrapper.getElement());
}
}
return false;
}
};
{noformat}
elegibleToPoll() doesn't check if delay is -ve and in
PollablePriorityQueue.poll() iterates over all elements and return any element
whose concurrency is not maxed
{noformat}
Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
while(e == null && iter.hasNext()) {
e = iter.next();
if (eligibleToPoll(e)) {
queues[i - 1].remove(e);
}
else {
debug("poll(): the iterator element [{0}], from
P[{1}] is not eligible to poll", e.getElement().toString(), i);
e = null;
}
}
{noformat}
> PollablePriorityDelayQueue.poll() returns elements with +ve delay
> -----------------------------------------------------------------
>
> Key: OOZIE-1664
> URL: https://issues.apache.org/jira/browse/OOZIE-1664
> Project: Oozie
> Issue Type: Bug
> Reporter: Shwetha G S
>
> In CallableQueueService:
> {noformat}
> queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 *
> 30, TimeUnit.MILLISECONDS, queueSize) {
> @Override
> protected boolean eligibleToPoll(QueueElement<?> element) {
> if (element != null) {
> CallableWrapper wrapper = (CallableWrapper) element;
> if (element.getElement() != null) {
> return
> callableReachMaxConcurrency(wrapper.getElement());
> }
> }
> return false;
> }
> };
> {noformat}
> elegibleToPoll() doesn't check if delay is -ve and in
> PollablePriorityQueue.poll() iterates over all elements and return any
> element whose concurrency is not maxed. The element returned can be an
> element with +ve delay which implies the element is still not up for
> execution(delay is not elapsed yet), but is picked up for execution
> {noformat}
> Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
> while(e == null && iter.hasNext()) {
> e = iter.next();
> if (eligibleToPoll(e)) {
> queues[i - 1].remove(e);
> }
> else {
> debug("poll(): the iterator element [{0}], from
> P[{1}] is not eligible to poll", e.getElement().toString(), i);
> e = null;
> }
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)