Author: degenaro
Date: Sun Nov  2 12:32:10 2014
New Revision: 1636120

URL: http://svn.apache.org/r1636120
Log:
UIMA-4069 Redesign of JD toward the main goal of classpath separation for 
container (system) code.

Update dispatcher to handle preemption events.

Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java?rev=1636120&r1=1636119&r2=1636120&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/dispatch/Dispatcher.java
 Sun Nov  2 12:32:10 2014
@@ -18,6 +18,7 @@
 */
 package org.apache.uima.ducc.container.jd.dispatch;
 
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.uima.ducc.container.common.ContainerLogger;
@@ -30,6 +31,8 @@ import org.apache.uima.ducc.container.co
 import org.apache.uima.ducc.container.jd.CasManagerStats;
 import org.apache.uima.ducc.container.jd.JobDriverCasManager;
 import org.apache.uima.ducc.container.jd.JobDriverCommon;
+import org.apache.uima.ducc.container.jd.dispatch.iface.IRemoteWorkerIdentity;
+import org.apache.uima.ducc.container.jd.dispatch.iface.IWorkItem;
 import org.apache.uima.ducc.container.jd.fsm.wi.ActionData;
 import org.apache.uima.ducc.container.jd.fsm.wi.WiFsm;
 import org.apache.uima.ducc.container.jd.mh.iface.INodeInfo;
@@ -48,25 +51,77 @@ public class Dispatcher {
        }
        
        public IOperatingInfo handleGetOperatingInfo() {
-               IOperatingInfo retVal = new OperatingInfo();
-               JobDriverCasManager jdcm = 
JobDriverCommon.getInstance().getCasManager();
-               CasManagerStats cms = jdcm.getCasManagerStats();
-               retVal.setWorkItemCrTotal(cms.getCrTotal());
-               retVal.setWorkItemCrFetches(cms.getCrGets());
+               String location = "handleGetOperatingInfo";
+               IOperatingInfo retVal = null;
+               try {
+                       retVal = new OperatingInfo();
+                       JobDriverCasManager jdcm = 
JobDriverCommon.getInstance().getCasManager();
+                       CasManagerStats cms = jdcm.getCasManagerStats();
+                       retVal.setWorkItemCrTotal(cms.getCrTotal());
+                       retVal.setWorkItemCrFetches(cms.getCrGets());
+                       
retVal.setWorkItemPreemptions(cms.getNumberOfPreemptions());
+                       MessageBuffer mb = new MessageBuffer();
+                       
mb.append(Standardize.Label.crTotal.get()+retVal.getWorkItemCrTotal());
+                       
mb.append(Standardize.Label.crFetches.get()+retVal.getWorkItemCrFetches());
+                       
mb.append(Standardize.Label.preemptions.get()+retVal.getWorkItemPreemptions());
+                       logger.debug(location, IEntityId.null_id, 
mb.toString());
+               }
+               catch(Exception e) {
+                       logger.error(location, IEntityId.null_id, e);
+               }
                return retVal;
        }
        
        public void handleDownNode(INodeInfo nodeInfo) {
-               
+               String location = "handleDownNode";
+               try {
+                       ConcurrentHashMap<IRemoteWorkerIdentity, IWorkItem> map 
= JobDriverCommon.getInstance().getMap();
+                       //TODO
+               }
+               catch(Exception e) {
+                       logger.error(location, IEntityId.null_id, e);
+               }
        }
        
        public void handleDownProcess(IProcessInfo processInfo) {
-               
+               String location = "handleDownProcess";
+               try {
+                       ConcurrentHashMap<IRemoteWorkerIdentity, IWorkItem> map 
= JobDriverCommon.getInstance().getMap();
+                       //TODO
+               }
+               catch(Exception e) {
+                       logger.error(location, IEntityId.null_id, e);
+               }
        }
-
        
        public void handlePreemptProcess(IProcessInfo processInfo) {
-               
+               String location = "handlePreemptProcess";
+               try {
+                       ConcurrentHashMap<IRemoteWorkerIdentity, IWorkItem> map 
= JobDriverCommon.getInstance().getMap();
+                       for(Entry<IRemoteWorkerIdentity, IWorkItem> entry : 
map.entrySet()) {
+                               IRemoteWorkerIdentity rwi = entry.getKey();
+                               if(rwi.comprises(processInfo)) {
+                                       MessageBuffer mb = new MessageBuffer();
+                                       
mb.append(Standardize.Label.remote.get()+rwi.toString());
+                                       mb.append(Boolean.TRUE.toString());
+                                       logger.debug(location, 
IEntityId.null_id, mb.toString());
+                                       IWorkItem wi = entry.getValue();
+                                       IFsm fsm = wi.getFsm();
+                                       IEvent event = WiFsm.Process_Preempt;
+                                       Object actionData = new ActionData(wi, 
rwi, null);
+                                       fsm.transition(event, actionData);
+                               }
+                               else {
+                                       MessageBuffer mb = new MessageBuffer();
+                                       
mb.append(Standardize.Label.remote.get()+rwi.toString());
+                                       mb.append(Boolean.FALSE.toString());
+                                       logger.trace(location, 
IEntityId.null_id, mb.toString());
+                               }
+                       }
+               }
+               catch(Exception e) {
+                       logger.error(location, IEntityId.null_id, e);
+               }
        }
        
        public void handleMetaCasTransation(IMetaCasTransaction trans) {
@@ -132,7 +187,7 @@ public class Dispatcher {
                return wi;
        }
        
-       public void handleMetaCasTransationGet(IMetaCasTransaction trans, 
IRemoteWorkerIdentity rwi) {
+       private void handleMetaCasTransationGet(IMetaCasTransaction trans, 
IRemoteWorkerIdentity rwi) {
                IWorkItem wi = register(rwi);
                IFsm fsm = wi.getFsm();
                IEvent event = WiFsm.Get_Request;
@@ -140,7 +195,7 @@ public class Dispatcher {
                fsm.transition(event, actionData);
        }
        
-       public void handleMetaCasTransationAck(IMetaCasTransaction trans, 
IRemoteWorkerIdentity rwi) {
+       private void handleMetaCasTransationAck(IMetaCasTransaction trans, 
IRemoteWorkerIdentity rwi) {
                String location = "handleMetaCasTransationAck";
                IWorkItem wi = find(rwi);
                if(wi == null) {
@@ -158,7 +213,7 @@ public class Dispatcher {
                }
        }
        
-       public void handleMetaCasTransationEnd(IMetaCasTransaction trans, 
IRemoteWorkerIdentity rwi) {
+       private void handleMetaCasTransationEnd(IMetaCasTransaction trans, 
IRemoteWorkerIdentity rwi) {
                String location = "handleMetaCasTransationEnd";
                IWorkItem wi = find(rwi);
                if(wi == null) {


Reply via email to