Author: degenaro
Date: Mon Nov 10 22:11:07 2014
New Revision: 1637989

URL: http://svn.apache.org/r1637989
Log:
UIMA-4069 Redesign of JD toward the main goal of classpath separation for 
container (system) code.

Job Driver error handler support for retry work item and killJob with JUnit 
test cases.

Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/Dispatcher.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestDispatcher.java

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
 Mon Nov 10 22:11:07 2014
@@ -19,6 +19,7 @@
 package org.apache.uima.ducc.container.jd.cas;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class CasManagerStats {
@@ -35,6 +36,8 @@ public class CasManagerStats {
        private AtomicInteger endFailure = new AtomicInteger(0);
        private AtomicInteger endRetry = new AtomicInteger(0);
        
+       private AtomicBoolean killJob = new AtomicBoolean(false);
+       
        private ConcurrentHashMap<String,AtomicInteger> retryReasonsMap = new 
ConcurrentHashMap<String,AtomicInteger>();
        
        public void setCrTotal(int value) {
@@ -115,4 +118,12 @@ public class CasManagerStats {
        public int getEndRetry() {
                return endRetry.get();
        }
+       
+       public void setKillJob() {
+               killJob.set(true);
+       }
+       
+       public boolean isKillJob() {
+               return killJob.get();
+       }
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionEnd.java
 Mon Nov 10 22:11:07 2014
@@ -45,8 +45,18 @@ public class ActionEnd implements IActio
                return ActionEnd.class.getName();
        }
        
-       private void retry(CasManager cm, IWorkItem wi, IMetaCasTransaction 
trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
-               String location = "retry";
+       private void killJob(CasManager cm, IWorkItem wi, IMetaCasTransaction 
trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
+               String location = "killJob";
+               cm.getCasManagerStats().setKillJob();
+               MessageBuffer mb = new MessageBuffer();
+               
mb.append(Standardize.Label.transNo.get()+trans.getTransactionId().toString());
+               mb.append(Standardize.Label.seqNo.get()+metaCas.getSystemKey());
+               mb.append(Standardize.Label.remote.get()+rwi.toString());
+               logger.info(location, IEntityId.null_id, mb.toString());
+       }
+       
+       private void retryWorkItem(CasManager cm, IWorkItem wi, 
IMetaCasTransaction trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
+               String location = "retryWorkItem";
                cm.putMetaCas(metaCas, RetryReason.UserErrorRetry);
                cm.getCasManagerStats().incEndRetry();
                MessageBuffer mb = new MessageBuffer();
@@ -56,8 +66,8 @@ public class ActionEnd implements IActio
                logger.info(location, IEntityId.null_id, mb.toString());
        }
        
-       private void failure(CasManager cm, IWorkItem wi, IMetaCasTransaction 
trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
-               String location = "failure";
+       private void killWorkItem(CasManager cm, IWorkItem wi, 
IMetaCasTransaction trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
+               String location = "killWorkItem";
                cm.getCasManagerStats().incEndFailure();
                MessageBuffer mb = new MessageBuffer();
                
mb.append(Standardize.Label.transNo.get()+trans.getTransactionId().toString());
@@ -66,8 +76,8 @@ public class ActionEnd implements IActio
                logger.info(location, IEntityId.null_id, mb.toString());
        }
        
-       private void success(CasManager cm, IWorkItem wi, IMetaCasTransaction 
trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
-               String location = "success";
+       private void successWorkItem(CasManager cm, IWorkItem wi, 
IMetaCasTransaction trans, IMetaCas metaCas, IRemoteWorkerIdentity rwi) {
+               String location = "successWorkItem";
                cm.getCasManagerStats().incEndSuccess();
                wi.setTodEnd();
                updateStatistics(wi);
@@ -98,19 +108,22 @@ public class ActionEnd implements IActio
                                        ProxyJobDriverErrorHandler pjdeh = 
jd.getProxyJobDriverErrorHandler();
                                        ProxyJobDriverDirective pjdd = 
pjdeh.handle(cas, exception);
                                        if(pjdd != null) {
+                                               if(pjdd.isKillJob()) {
+                                                       killJob(cm, wi, trans, 
metaCas, rwi);
+                                               }
                                                if(pjdd.isKillWorkItem()) {
-                                                       failure(cm, wi, trans, 
metaCas, rwi);
+                                                       killWorkItem(cm, wi, 
trans, metaCas, rwi);
                                                }
                                                else {
-                                                       retry(cm, wi, trans, 
metaCas, rwi);
+                                                       retryWorkItem(cm, wi, 
trans, metaCas, rwi);
                                                }
                                        }
                                        else {
-                                               failure(cm, wi, trans, metaCas, 
rwi);
+                                               killWorkItem(cm, wi, trans, 
metaCas, rwi);
                                        }
                                }
                                else {
-                                       success(cm, wi, trans, metaCas, rwi);
+                                       successWorkItem(cm, wi, trans, metaCas, 
rwi);
                                }
                                wi.resetTods();
                        }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/Dispatcher.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/Dispatcher.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/Dispatcher.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/Dispatcher.java
 Mon Nov 10 22:11:07 2014
@@ -69,6 +69,9 @@ public class Dispatcher {
                        oi.setWorkItemEndSuccesses(cms.getEndSuccess());
                        oi.setWorkItemEndFailures(cms.getEndFailure());
                        oi.setWorkItemEndRetrys(cms.getEndRetry());
+                       if(cms.isKillJob()) {
+                               oi.setKillJob();
+                       }
                        oi.setWorkItemPreemptions(cms.getNumberOfPreemptions());
                        oi.setWorkItemFinishedMillisMin(wis.getMillisMin());
                        oi.setWorkItemFinishedMillisMax(wis.getMillisMax());
@@ -81,6 +84,7 @@ public class Dispatcher {
                        
mb.append(Standardize.Label.crFetches.get()+oi.getWorkItemCrFetches());
                        
mb.append(Standardize.Label.endSuccess.get()+oi.getWorkItemEndSuccesses());
                        
mb.append(Standardize.Label.endFailure.get()+oi.getWorkItemEndFailures());
+                       
mb.append(Standardize.Label.killJob.get()+oi.isKillJob());
                        
mb.append(Standardize.Label.preemptions.get()+oi.getWorkItemPreemptions());
                        
mb.append(Standardize.Label.finishedMillisMin.get()+oi.getWorkItemFinishedMillisMin());
                        
mb.append(Standardize.Label.finishedMillisMax.get()+oi.getWorkItemFinishedMillisMax());

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/iface/IOperatingInfo.java
 Mon Nov 10 22:11:07 2014
@@ -75,4 +75,9 @@ public interface IOperatingInfo {
        
        public void setWorkItemTodMostRecentStart(long value);
        public long getWorkItemTodMostRecentStart();
+       
+       //
+       
+       public void setKillJob();
+       public boolean isKillJob();
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/impl/OperatingInfo.java
 Mon Nov 10 22:11:07 2014
@@ -42,6 +42,8 @@ public class OperatingInfo implements IO
        
        private long todMostRecentStart = 0;
        
+       private boolean killJob = false;
+       
        @Override
        public void setWorkItemCrTotal(int value) {
                crTotal = value;
@@ -207,4 +209,14 @@ public class OperatingInfo implements IO
                return todMostRecentStart;
        }
 
+       @Override
+       public void setKillJob() {
+               killJob = true;
+       }
+
+       @Override
+       public boolean isKillJob() {
+               return killJob;
+       }
+
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestDispatcher.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestDispatcher.java?rev=1637989&r1=1637988&r2=1637989&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestDispatcher.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestDispatcher.java
 Mon Nov 10 22:11:07 2014
@@ -337,6 +337,13 @@ public class TestDispatcher extends ATes
                        asExpected("CASes error count == 
"+expectedErrorsTest04);
                        assertTrue(endSuccess+endFailure == 100);
                        asExpected("CASes failure+success count == 100");
+                       boolean killJob = oi.isKillJob();
+                       if(endFailure >= 15) {
+                               assertTrue(killJob == true);
+                       }
+                       else {
+                               assertTrue(killJob == false);
+                       }
                }
                catch(Exception e) {
                        e.printStackTrace();
@@ -360,10 +367,10 @@ public class TestDispatcher extends ATes
        }
        
        
-       // multiple node:pid:tid with errors & retrys
+       // multiple node:pid:tid with errors
        
        @Test
-       public void test_05() {
+       public void test_05a() {
                if(isDisabled(this.getClass().getName())) {
                        return;
                }
@@ -376,8 +383,6 @@ public class TestDispatcher extends ATes
                        jdCfg.setUserClasspath(Utilities.userCP);
                        jdCfg.setCrXml(crXml);
                        jdCfg.setCrCfg(crCfg);
-                       String eh = 
"org.apache.uima.ducc.user.jd.test.helper.TestJdContainerErrorHandlerRandomRetry";
-                       jdCfg.setErrorHandlerClassName(eh);
                        JobDriver.setInstance(jdCfg);
                        int size = JobDriver.getInstance().getMap().size();
                        debug("map size:"+size);
@@ -412,15 +417,83 @@ public class TestDispatcher extends ATes
                        asExpected("CASes fetched count == 100");
                        long endSuccess = oi.getWorkItemEndSuccesses();
                        long endFailure = oi.getWorkItemEndFailures();
-                       long endRetry = oi.getWorkItemEndRetrys();
                        debug("injected errors: "+inject);
                        debug("end success: "+endSuccess);
                        debug("end failure: "+endFailure);
-                       debug("end retry: "+endRetry);
+                       assertTrue(endFailure == expectedErrorsTest05);
+                       asExpected("CASes error count == 
"+expectedErrorsTest05);
                        assertTrue(endSuccess+endFailure == 100);
                        asExpected("CASes failure+success count == 100");
-                       assertTrue(endRetry > 0);
-                       asExpected("CASes retry count == "+endRetry);
+                       boolean killJob = oi.isKillJob();
+                       assertTrue(killJob == false);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail("Exception");
+               }
+       }
+       
+       @Test
+       public void test_05b() {
+               if(isDisabled(this.getClass().getName())) {
+                       return;
+               }
+               try {
+                       URL urlXml = this.getClass().getResource("/CR100.xml");
+                       File file = new File(urlXml.getFile());
+                       String crXml = file.getAbsolutePath();
+                       String crCfg = null;
+                       IJobDriverConfig jdCfg = new JobDriverConfig();
+                       jdCfg.setUserClasspath(Utilities.userCP);
+                       jdCfg.setCrXml(crXml);
+                       jdCfg.setCrCfg(crCfg);
+                       //
+                       String ehcp = "KillJobLimit="+2;
+                       jdCfg.setErrorHandlerConfigurationParameters(ehcp);
+                       //
+                       JobDriver.setInstance(jdCfg);
+                       int size = JobDriver.getInstance().getMap().size();
+                       debug("map size:"+size);
+                       Dispatcher dispatcher = new Dispatcher();
+                       ThreadInfoFactory tif = new ThreadInfoFactory(2,2,2);
+                       ThreadInfo ti = tif.getRandom();
+                       debug("random:"+ti.toKey());
+                       int casNo = -1;
+                       IMetaCas metaCasPrevious = null;
+                       IMetaCas metaCas = 
transGet(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+                       assertTrue(metaCas != null);
+                       int inject = 0;
+                       while(metaCas != null) {
+                               
transAck(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+                               if(randomErrorTest05()) {
+                                       Exception exception = new 
RuntimeException("injected error test #05");
+                                       
metaCas.setUserSpaceException(exception);
+                                       inject++;
+                               }
+                               
transEnd(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+                               casNo--;
+                               metaCasPrevious = metaCas;
+                               assertTrue(metaCasPrevious != null);
+                               ti = tif.getRandom();
+                               debug("random:"+ti.toKey());
+                               metaCas = 
transGet(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+                       }
+                       
assertTrue(metaCasPrevious.getSystemKey().equals("100"));
+                       asExpected("CASes processed count == 100");
+                       IOperatingInfo oi = dispatcher.handleGetOperatingInfo();
+                       assertTrue(oi.getWorkItemCrFetches() == 100);
+                       asExpected("CASes fetched count == 100");
+                       long endSuccess = oi.getWorkItemEndSuccesses();
+                       long endFailure = oi.getWorkItemEndFailures();
+                       debug("injected errors: "+inject);
+                       debug("end success: "+endSuccess);
+                       debug("end failure: "+endFailure);
+                       assertTrue(endFailure == expectedErrorsTest05);
+                       asExpected("CASes error count == 
"+expectedErrorsTest05);
+                       assertTrue(endSuccess+endFailure == 100);
+                       asExpected("CASes failure+success count == 100");
+                       boolean killJob = oi.isKillJob();
+                       assertTrue(killJob == true);
                }
                catch(Exception e) {
                        e.printStackTrace();
@@ -430,7 +503,9 @@ public class TestDispatcher extends ATes
        
        private long seedTest05 = 5;
        private Random randomTest05 = new Random(seedTest05);
-       private long pctTest05 = 15;
+       private long pctTest05 = 5;
+       
+       private long expectedErrorsTest05 = 7;
        
        private boolean randomErrorTest05() {
                boolean retVal = false;
@@ -440,4 +515,85 @@ public class TestDispatcher extends ATes
                }
                return retVal;
        }
+       
+       // multiple node:pid:tid with errors & retrys
+       
+       @Test
+       public void test_06() {
+               if(isDisabled(this.getClass().getName())) {
+                       return;
+               }
+               try {
+                       URL urlXml = this.getClass().getResource("/CR100.xml");
+                       File file = new File(urlXml.getFile());
+                       String crXml = file.getAbsolutePath();
+                       String crCfg = null;
+                       IJobDriverConfig jdCfg = new JobDriverConfig();
+                       jdCfg.setUserClasspath(Utilities.userCP);
+                       jdCfg.setCrXml(crXml);
+                       jdCfg.setCrCfg(crCfg);
+                       String eh = 
"org.apache.uima.ducc.user.jd.test.helper.TestJdContainerErrorHandlerRandomRetry";
+                       jdCfg.setErrorHandlerClassName(eh);
+                       JobDriver.setInstance(jdCfg);
+                       int size = JobDriver.getInstance().getMap().size();
+                       debug("map size:"+size);
+                       Dispatcher dispatcher = new Dispatcher();
+                       ThreadInfoFactory tif = new ThreadInfoFactory(2,2,2);
+                       ThreadInfo ti = tif.getRandom();
+                       debug("random:"+ti.toKey());
+                       int casNo = -1;
+                       IMetaCas metaCasPrevious = null;
+                       IMetaCas metaCas = 
transGet(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+                       assertTrue(metaCas != null);
+                       int inject = 0;
+                       while(metaCas != null) {
+                               
transAck(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+                               if(randomErrorTest06()) {
+                                       Exception exception = new 
RuntimeException("injected error test #06");
+                                       
metaCas.setUserSpaceException(exception);
+                                       inject++;
+                               }
+                               
transEnd(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+                               casNo--;
+                               metaCasPrevious = metaCas;
+                               assertTrue(metaCasPrevious != null);
+                               ti = tif.getRandom();
+                               debug("random:"+ti.toKey());
+                               metaCas = 
transGet(dispatcher,ti.getNode(),ti.getPid(),ti.getTid(),casNo);
+                       }
+                       
assertTrue(metaCasPrevious.getSystemKey().equals("100"));
+                       asExpected("CASes processed count == 100");
+                       IOperatingInfo oi = dispatcher.handleGetOperatingInfo();
+                       assertTrue(oi.getWorkItemCrFetches() == 100);
+                       asExpected("CASes fetched count == 100");
+                       long endSuccess = oi.getWorkItemEndSuccesses();
+                       long endFailure = oi.getWorkItemEndFailures();
+                       long endRetry = oi.getWorkItemEndRetrys();
+                       debug("injected errors: "+inject);
+                       debug("end success: "+endSuccess);
+                       debug("end failure: "+endFailure);
+                       debug("end retry: "+endRetry);
+                       assertTrue(endSuccess+endFailure == 100);
+                       asExpected("CASes failure+success count == 100");
+                       assertTrue(endRetry > 0);
+                       asExpected("CASes retry count == "+endRetry);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail("Exception");
+               }
+       }
+       
+       private long seedTest06 = 6;
+       private Random randomTest06 = new Random(seedTest06);
+       private long pctTest06 = 15;
+       
+       private boolean randomErrorTest06() {
+               boolean retVal = false;
+               int n = randomTest06.nextInt(100);
+               if(n < pctTest06) {
+                       retVal = true;
+               }
+               return retVal;
+       }
 }


Reply via email to