Repository: oozie
Updated Branches:
  refs/heads/master fca69e955 -> 872db60c8


OOZIE-1699 Some of the commands submitted to Oozie internal queue are never 
executed (sriksun via virag)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/872db60c
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/872db60c
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/872db60c

Branch: refs/heads/master
Commit: 872db60c8da0d03900e057923ae41f043ffc6b59
Parents: fca69e9
Author: Virag Kothari <[email protected]>
Authored: Mon Feb 24 10:30:27 2014 -0800
Committer: Virag Kothari <[email protected]>
Committed: Mon Feb 24 10:30:27 2014 -0800

----------------------------------------------------------------------
 .../oozie/service/CallableQueueService.java     | 35 +++++++-----
 .../oozie/util/PollablePriorityDelayQueue.java  |  2 +-
 .../apache/oozie/util/PriorityDelayQueue.java   | 32 ++++++-----
 .../oozie/service/TestCallableQueueService.java | 59 +++++++++++++++++---
 release-log.txt                                 |  1 +
 5 files changed, 92 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/872db60c/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java 
b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index ab81b09..093eb08 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -153,16 +152,17 @@ public class CallableQueueService implements Service, 
Instrumentable {
         }
 
         public void run() {
-            if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
-                log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with 
[{1}]ms delay", getElement().getType(),
-                        SAFE_MODE_DELAY);
-                setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
-                removeFromUniqueCallables();
-                queue(this, true);
-                return;
-            }
-            XCallable<?> callable = getElement();
+            XCallable<?> callable = null;
             try {
+                removeFromUniqueCallables();
+                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
+                    log.info("Oozie is in SAFEMODE, requeuing callable [{0}] 
with [{1}]ms delay", getElement().getType(),
+                            SAFE_MODE_DELAY);
+                    setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
+                    queue(this, true);
+                    return;
+                }
+                callable = getElement();
                 if (callableBegin(callable)) {
                     cron.stop();
                     addInQueueCron(cron);
@@ -170,7 +170,6 @@ public class CallableQueueService implements Service, 
Instrumentable {
                     XLog log = XLog.getLog(getClass());
                     log.trace("executing callable [{0}]", callable.getName());
 
-                    removeFromUniqueCallables();
                     try {
                         callable.call();
                         incrCounter(INSTR_EXECUTED_COUNTER, 1);
@@ -188,13 +187,19 @@ public class CallableQueueService implements Service, 
Instrumentable {
                     log.warn("max concurrency for callable [{0}] exceeded, 
requeueing with [{1}]ms delay", callable
                             .getType(), CONCURRENCY_DELAY);
                     setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
-                    removeFromUniqueCallables();
                     queue(this, true);
                     incrCounter(callable.getType() + "#exceeded.concurrency", 
1);
                 }
             }
+            catch (Throwable t) {
+                incrCounter(INSTR_FAILED_COUNTER, 1);
+                log.warn("exception callable [{0}], {1}", callable == null ? 
"N/A" : callable.getName(),
+                        t.getMessage(), t);
+            }
             finally {
-                callableEnd(callable);
+                if (callable != null) {
+                    callableEnd(callable);
+                }
             }
         }
 
@@ -558,9 +563,9 @@ public class CallableQueueService implements Service, 
Instrumentable {
                 try {
                     executor.execute(wrapper);
                 }
-                catch (RejectedExecutionException ree) {
+                catch (Throwable ree) {
                     wrapper.removeFromUniqueCallables();
-                    throw ree;
+                    throw new RuntimeException(ree);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/872db60c/core/src/main/java/org/apache/oozie/util/PollablePriorityDelayQueue.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/PollablePriorityDelayQueue.java 
b/core/src/main/java/org/apache/oozie/util/PollablePriorityDelayQueue.java
index 6645544..6d692e3 100644
--- a/core/src/main/java/org/apache/oozie/util/PollablePriorityDelayQueue.java
+++ b/core/src/main/java/org/apache/oozie/util/PollablePriorityDelayQueue.java
@@ -39,8 +39,8 @@ public class PollablePriorityDelayQueue<E> extends 
PriorityDelayQueue<E> {
      */
     @Override
     public QueueElement<E> poll() {
+        lock.lock();
         try {
-            lock.lock();
             antiStarvation();
             QueueElement<E> e = null;
             int i = priorities;

http://git-wip-us.apache.org/repos/asf/oozie/blob/872db60c/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java 
b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
index 8b4e0ff..a3f2148 100644
--- a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
+++ b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java
@@ -258,8 +258,8 @@ public class PriorityDelayQueue<E> extends 
AbstractQueue<PriorityDelayQueue.Queu
     @SuppressWarnings("unchecked")
     public Iterator<QueueElement<E>> iterator() {
         QueueElement[][] queueElements = new QueueElement[queues.length][];
+        lock.lock();
         try {
-            lock.lock();
             for (int i = 0; i < queues.length; i++) {
                 queueElements[i] = queues[i].toArray(new QueueElement[0]);
             }
@@ -340,23 +340,29 @@ public class PriorityDelayQueue<E> extends 
AbstractQueue<PriorityDelayQueue.Queu
         if (queueElement == null) {
             throw new NullPointerException("queueElement is NULL");
         }
-        if (queueElement.getPriority() < 0 && queueElement.getPriority() >= 
priorities) {
-            throw new IllegalArgumentException("priority out of range");
+        if (queueElement.getPriority() < 0 || queueElement.getPriority() >= 
priorities) {
+            throw new IllegalArgumentException("priority out of range: " + 
queueElement);
         }
         if (queueElement.inQueue) {
-            throw new IllegalStateException("queueElement already in a queue");
+            throw new IllegalStateException("queueElement already in a queue: 
" + queueElement);
         }
         if (!ignoreSize && currentSize != null && currentSize.get() >= 
maxSize) {
             return false;
         }
-        boolean accepted = 
queues[queueElement.getPriority()].offer(queueElement);
-        debug("offer([{0}]), to P[{1}] delay[{2}ms] accepted[{3}]", 
queueElement.getElement().toString(),
-              queueElement.getPriority(), 
queueElement.getDelay(TimeUnit.MILLISECONDS), accepted);
-        if (accepted) {
-            if (currentSize != null) {
-                currentSize.incrementAndGet();
+        boolean accepted;
+        lock.lock();
+        try {
+            accepted = queues[queueElement.getPriority()].offer(queueElement);
+            debug("offer([{0}]), to P[{1}] delay[{2}ms] accepted[{3}]", 
queueElement.getElement().toString(),
+                  queueElement.getPriority(), 
queueElement.getDelay(TimeUnit.MILLISECONDS), accepted);
+            if (accepted) {
+                if (currentSize != null) {
+                    currentSize.incrementAndGet();
+                }
+                queueElement.inQueue = true;
             }
-            queueElement.inQueue = true;
+        } finally {
+            lock.unlock();
         }
         return accepted;
     }
@@ -390,8 +396,8 @@ public class PriorityDelayQueue<E> extends 
AbstractQueue<PriorityDelayQueue.Queu
      */
     @Override
     public QueueElement<E> poll() {
+        lock.lock();
         try {
-            lock.lock();
             antiStarvation();
             QueueElement<E> e = null;
             int i = priorities;
@@ -421,8 +427,8 @@ public class PriorityDelayQueue<E> extends 
AbstractQueue<PriorityDelayQueue.Queu
      */
     @Override
     public QueueElement<E> peek() {
+        lock.lock();
         try {
-            lock.lock();
             antiStarvation();
             QueueElement<E> e = null;
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/872db60c/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java 
b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
index 903866d..fefb448 100644
--- a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
@@ -17,20 +17,21 @@
  */
 package org.apache.oozie.service;
 
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
+import junit.framework.Assert;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.XCommand;
 import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.util.XCallable;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class TestCallableQueueService extends XTestCase {
     static AtomicLong EXEC_ORDER = new AtomicLong();
 
@@ -872,4 +873,46 @@ public class TestCallableQueueService extends XTestCase {
         assertTrue(intCallable.executed > callables.get(5).executed);
     }
 
+    public void testRemoveUniqueCallables() throws Exception {
+        XCommand command = new XCommand("Test", "type", 100) {
+            @Override
+            protected boolean isLockRequired() {
+                return false;
+            }
+
+            @Override
+            public String getEntityKey() {
+                return "TEST";
+            }
+
+            @Override
+            protected void loadState() throws CommandException {
+            }
+
+            @Override
+            protected void verifyPrecondition() throws CommandException, 
PreconditionException {
+            }
+
+            @Override
+            protected Object execute() throws CommandException {
+                return null;
+            }
+        };
+        Services.get().destroy();
+        setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+        new Services().init();
+
+        CallableQueueService queueservice = 
Services.get().get(CallableQueueService.class);
+        List<String> uniquesBefore = queueservice.getUniqueDump();
+        try {
+            queueservice.queue(command);
+            fail("Expected illegal argument exception: priority = 100");
+        }
+        catch (Exception e) {
+            assertTrue(e.getCause() != null && e.getCause() instanceof 
IllegalArgumentException);
+        }
+        List<String> uniquesAfter = queueservice.getUniqueDump();
+        uniquesAfter.removeAll(uniquesBefore);
+        assertTrue(uniquesAfter.toString(), uniquesAfter.isEmpty());
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/872db60c/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 3f8b9bb..fa0540f 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1699 Some of the commands submitted to Oozie internal queue are never 
executed (sriksun via virag)
 OOZIE-1671 add an option to limit # of coordinator actions for log retrieval 
(ryota)
 OOZIE-1629 EL function in <timeout> is not evaluated properly (ryota)
 OOZIE-1618 dryrun should check variable substitution in workflow.xml 
(bowenzhangusa via rkanter)

Reply via email to