Author: degenaro Date: Thu Oct 17 14:53:01 2013 New Revision: 1533095 URL: http://svn.apache.org/r1533095 Log: UIMA-3355 DUCC webserver (WS) shows more dispatched than total work items for Job ?
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java?rev=1533095&r1=1533094&r2=1533095&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java Thu Oct 17 14:53:01 2013 @@ -80,6 +80,8 @@ public class DriverStatusReport implemen private AtomicInteger threadCount = new AtomicInteger(0); private ConcurrentHashMap<String,DuccId> casQueuedMap = new ConcurrentHashMap<String,DuccId>(); + private ConcurrentHashMap<String,DuccId> casDequeuedPendingMap = new ConcurrentHashMap<String,DuccId>(); + private ConcurrentHashMap<String,HashMap<String,String>> casOperatingMap = new ConcurrentHashMap<String,HashMap<String,String>>(); private ConcurrentHashMap<Integer,DuccId> limboMap = new ConcurrentHashMap<Integer,DuccId>(); @@ -456,17 +458,46 @@ public class DriverStatusReport implemen } public int getWorkItemsQueued() { - return casQueuedMap.size(); + return casQueuedMap.size()+casDequeuedPendingMap.size(); } public void workItemQueued(String casId, DuccId jobId) { - casQueuedMap.put(casId, jobId); - logReport(); + String methodName = "workItemQueued"; + try { + synchronized(casQueuedMap) { + if(casQueuedMap.containsKey(casId)) { + duccOut.warn(methodName, duccId, casId+" already queued"); + } + if(casDequeuedPendingMap.containsKey(casId)) { + duccOut.warn(methodName, duccId, casId+" already dequeued"); + casDequeuedPendingMap.remove(casId); + } + else { + casQueuedMap.put(casId, jobId); + } + } + } + catch(Throwable t) { + duccOut.debug(methodName, duccId, t); + } } public void workItemDequeued(String casId) { - casQueuedMap.remove(casId); - logReport(); + String methodName = "workItemDequeued"; + try { + synchronized(casQueuedMap) { + if(!casQueuedMap.containsKey(casId)) { + duccOut.warn(methodName, duccId, casId+" not found"); + casDequeuedPendingMap.put(casId, duccId); + } + else { + casQueuedMap.remove(casId); + } + } + } + catch(Throwable t) { + duccOut.debug(methodName, duccId, t); + } } public int getWorkItemsDispatched() {