Author: degenaro
Date: Sat Apr 23 10:46:49 2016
New Revision: 1740644

URL: http://svn.apache.org/viewvc?rev=1740644&view=rev
Log:
UIMA-4902 DUCC Job Driver (JD) add programmability feature to built-in error 
handler

Added:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
   (with props)
Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandler.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/logger/ToLog.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/test/java/org/apache/uima/ducc/user/jd/test/TestSuite.java

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandler.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandler.java?rev=1740644&r1=1740643&r2=1740644&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandler.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandler.java
 Sat Apr 23 10:46:49 2016
@@ -18,41 +18,18 @@
 */
 package org.apache.uima.ducc;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.uima.ducc.ErrorHandlerProgrammability.Key;
 import org.apache.uima.ducc.logger.ToLog;
-import org.apache.uima.ducc.user.common.QuotedOptions;
 import org.apache.uima.ducc.user.error.iface.Transformer;
-import org.apache.uima.ducc.user.jd.JdUser;
 
 public class ErrorHandler implements IErrorHandler {
 
-       /**
-        * These are the System Properties nominally specified in the
-        * user's job submission (as -D's for driver_jvm_args) which will 
-        * be considered for adjusting runtime operational characteristics.
-        */
-       public static enum SystemPropertyNames {
-               JobDriverErrorHandlerMaximumNumberOfTimeoutRetrysPerWorkItem,
-       }
-       
-       /**
-        * Return a directive with isKillWorkItem() == false unless and until 
-        * the maximumNumberOfTimeoutRetrysPerWorkItem is exceeded.
-        */
-       private AtomicInteger maximumNumberOfTimeoutRetrysPerWorkItem = new 
AtomicInteger(0);
-       
-       /**
-        * Flag to insure initialization occurs exactly once.
-        */
-       private AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
+       private String initializationData = null;
+       private ErrorHandlerProgrammability ehp = null;
        
        /**
         * A map comprising an entry for each work item with a corresponding 
count
@@ -60,143 +37,42 @@ public class ErrorHandler implements IEr
         */
        private ConcurrentHashMap<String,AtomicLong> retryMap = new 
ConcurrentHashMap<String,AtomicLong>();
        
-       public enum InitializationDataKey { 
-               KillJobLimit("max_job_errors"), 
-               ;
-               
-               private String altname = null;
-               
-               private InitializationDataKey() {
-                       altname = name();
-               }
-               
-               private InitializationDataKey(String value) {
-                       altname = value;
-               }
-               
-               public String altname() {
-                       return altname;
-               }
-       };
-       
-       private static int DefaultJobErrorLimit = JdUser.DefaultJobErrorLimit;
-       
-       private AtomicInteger jobErrorLimit = new 
AtomicInteger(DefaultJobErrorLimit);
-       
+       /**
+        * The number of work item errors encountered for the job
+        */
        private AtomicInteger jobErrorCount = new AtomicInteger(0);
        
        public ErrorHandler() {
        }
        
        public ErrorHandler(String initializationData) {
-               initialize(initializationData);
-       }
-       
-       /**
-        * Insure we initialize exactly once
-        */
-       private void initializeOnce() {
-               synchronized(ErrorHandler.class) {
-                       if(!alreadyInitialized.get()) {
-                               Properties systemProperties = 
System.getProperties();
-                               initTimeoutRetrys(systemProperties);
-                               alreadyInitialized.set(true);
-                       }
-               }
+               setInitializationData(initializationData);
        }
        
-       /**
-        * Use the user specified 
-DJobDriverErrorHandlerMaximumNumberOfTimeoutRetrysPerWorkItem to set
-        * max timeouts per work item, if specified otherwise 0 meaning no 
retrys
-        */
-       private void initTimeoutRetrys(Properties systemProperties) {
-               String key = 
SystemPropertyNames.JobDriverErrorHandlerMaximumNumberOfTimeoutRetrysPerWorkItem.name();
-               if(systemProperties != null) {
-                       if(systemProperties.containsKey(key)) {
-                               String value = 
systemProperties.getProperty(key);
-                               try {
-                                       int integer = Integer.parseInt(value);
-                                       if(integer < 0) {
-                                               String text = "Invalid: 
"+key+"="+value;
-                                               
ToLog.info(ErrorHandler.class,text);
-                                       }
-                                       else {
-                                               
maximumNumberOfTimeoutRetrysPerWorkItem.set(integer);;
-                                               String text = "Override: 
"+key+"="+maximumNumberOfTimeoutRetrysPerWorkItem.get();
-                                               
ToLog.info(ErrorHandler.class,text);
-                                       }
-                               }
-                               catch(Exception e) {
-                                       String text = "Invalid: "+key+"="+value;
-                                       ToLog.info(ErrorHandler.class,text);
-                               }
-                       }
-                       else {
-                               String text = "Default: 
"+key+"="+maximumNumberOfTimeoutRetrysPerWorkItem.get();
-                               ToLog.info(ErrorHandler.class,text);
-                       }
-               }
-               else {
-                       String text = "Default: 
"+key+"="+maximumNumberOfTimeoutRetrysPerWorkItem.get();
-                       ToLog.info(ErrorHandler.class,text);
-               }
+       private void setInitializationData(String value) {
+               initializationData = value;
        }
        
-       private Map<String, String> parse(String initializationData) {
-               Map<String, String> map = new HashMap<String, String>();
-               try {
-                       if(initializationData != null) {
-                               ArrayList<String> toks = 
QuotedOptions.tokenizeList(initializationData, true);
-                               if(toks != null) {
-                                       for(String tok : toks) {
-                                               String[] split = tok.split("=");
-                                               String key = 
split[0].trim().toLowerCase();
-                                               String value = split[1].trim();
-                                               map.put(key, value);
-                                       }
-                               }
-                       } 
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-               }
-               return map;
+       private String getInitializationData() {
+               return initializationData;
        }
        
        @Override
        public void initialize(String initializationData) {
-               if(initializationData != null) {
-                       Map<String, String> map = parse(initializationData);
-                       String key;
-                       key = 
InitializationDataKey.KillJobLimit.name().toLowerCase();
-                       if(map.containsKey(key)) {
-                               String value = map.get(key);
-                               initKillJob(value);
-                       }
-                       else {
-                               String altkey = 
InitializationDataKey.KillJobLimit.altname();
-                               if(map.containsKey(altkey)) {
-                                       String value = map.get(altkey);
-                                       initKillJob(value);
-                               }
-                       }
-               }
-       }
-
-       private void initKillJob(String value) {
-               try {
-                       int expect = DefaultJobErrorLimit;
-                       int update = Integer.parseInt(value);
-                       jobErrorLimit.compareAndSet(expect, update);
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-               }
+               setInitializationData(initializationData);
        }
        
        @Override
        public IErrorHandlerDirective handle(String serializedCAS, Object 
object) {
-               initializeOnce();
+               // Do not actually initialize until the first handle situation
+               // arises so as to not create the file ErrorHandler.log unless 
+               // a work item error or timeout occurs.
+               synchronized(ErrorHandler.class) {
+                       if(ehp == null) {
+                               ehp = new 
ErrorHandlerProgrammability(getInitializationData());
+                       }
+               }
+               // Make ready a default directive for return
                ErrorHandlerDirective jdUserDirective = new 
ErrorHandlerDirective();
                try {
                        Throwable userThrowable = null;
@@ -209,14 +85,16 @@ public class ErrorHandler implements IEr
                                        userThrowable = (Throwable) object;
                                        userThrowable.getClass();
                                        ToLog.info(ErrorHandler.class, 
serializedCAS);
-                                       ToLog.info(ErrorHandler.class, 
userThrowable);
+                                       ToLog.warning(ErrorHandler.class, 
userThrowable);
                                        if(serializedCAS != null) {
                                                
retryMap.putIfAbsent(serializedCAS, new AtomicLong(0));
                                                AtomicLong retryCount = 
retryMap.get(serializedCAS);
                                                long count = 
retryCount.incrementAndGet();
-                                               if(count <= 
maximumNumberOfTimeoutRetrysPerWorkItem.get()) {
+                                               Integer 
max_timeout_retrys_per_workitem = 
ehp.getInteger(Key.max_timeout_retrys_per_workitem);
+                                               // don't kill work item if 
still eligible for timeout retry
+                                               if(count <= 
max_timeout_retrys_per_workitem) {
                                                        
jdUserDirective.resetKillWorkItem();
-                                                       String text = "retry # 
"+count+" of "+maximumNumberOfTimeoutRetrysPerWorkItem.get()+" for: 
"+serializedCAS;
+                                                       String text = "retry # 
"+count+" of "+max_timeout_retrys_per_workitem+" for: "+serializedCAS;
                                                        
ToLog.info(ErrorHandler.class,text);
                                                }
                                                else {
@@ -238,13 +116,16 @@ public class ErrorHandler implements IEr
                        else {
                                jobErrorCount.incrementAndGet();
                        }
-                       if(jobErrorCount.get() > jobErrorLimit.get()) {
+                       Integer max_job_errors = 
ehp.getInteger(Key.max_job_errors);
+                       // kill job if max errors limit is surpassed
+                       if(jobErrorCount.get() > max_job_errors) {
                                jdUserDirective.setKillJob();
                        }
                }
                catch(Exception e) {
                        e.printStackTrace();
                }
+               // record results in ErrorHandler.log
                StringBuffer sb = new StringBuffer();
                sb.append("KillJob: ");
                sb.append(jdUserDirective.isKillJob());

Added: 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java?rev=1740644&view=auto
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
 (added)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
 Sat Apr 23 10:46:49 2016
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.logger.ToLog;
+
+public class ErrorHandlerProgrammability {
+
+       /**
+        * This class interprets the <string> specified for
+        * --driver_exception_handler_arguments of the ducc_submit command.
+        */
+       
+       private Map<Key,Integer> map = new ConcurrentHashMap<Key,Integer>();
+       
+       /**
+        * Supported keywords for 
+        * ducc_submit --driver_exception_handler_arguments <string>
+        */
+       public enum Key { 
+               max_job_errors,
+               max_timeout_retrys_per_workitem,
+               ;
+       }
+       
+       /**
+        * Initialize with defaults
+        */
+       {
+               map.put(Key.max_job_errors, new Integer(15));
+               map.put(Key.max_timeout_retrys_per_workitem, new Integer(0));
+       }
+       
+       public ErrorHandlerProgrammability() {
+               dumpArgs();
+               dumpMap();
+       }
+       
+       /**
+        * @param argString is a blank-delimited String of key=value pairs 
+        *        with no embedded blanks.  Example: keyA=valA keyB=valB...
+        */
+       
+       public ErrorHandlerProgrammability(String argString) {
+               dumpArgs(argString);
+               String[] args = toArgs(argString);
+               override(args);
+               dumpMap();
+       }
+       
+       private String[] toArgs(String argString) {
+               String[] list = null;
+               if(argString != null) {
+                       list = argString.split("\\s+");
+               }
+               return list;
+       }
+       
+       /**
+        * @param args is an array of key=value Strings
+        */
+       
+       public ErrorHandlerProgrammability(String[] args) {
+               dumpArgs(args);
+               override(args);
+               dumpMap();
+       }
+       
+       /**
+        * @param args is an array of key=value Strings
+        * 
+        * Replace the default values in the map with those specified
+        * by the passed in args
+        */
+       private void override(String[] args) {
+               if(args != null) {
+                       for(String arg : args) {
+                               String[] kvp = arg.split("=");
+                               if(kvp.length == 2) {
+                                       Key key = parseKey(kvp[0]);
+                                       Integer value = parseValue(kvp[1]);
+                                       map.put(key,value);
+                                       String text = "override: 
"+key.name()+"="+value;
+                                       ToLog.info(ErrorHandler.class,text);
+                               }
+                               else {
+                                       String text = "illegal argument: "+arg;
+                                       ToLog.warning(ErrorHandler.class,text);
+                               }
+                       }
+               }
+       }
+       
+       /**
+        * @param name is one of the expected String
+        * @return Key is the corresponding enum
+        */
+       
+       private Key parseKey(String name) {
+               Key key = null;
+               if(name != null) {
+                       for(Key k : Key.values()) {
+                               if(name.equals(k.name())) {
+                                       key = k;
+                                       break;
+                               }
+                       }
+                       if(key == null) {
+                               String text = "illegal argument: "+name;
+                               ToLog.warning(ErrorHandler.class,text);
+                       }
+               }
+               else {
+                       String text = "missing argument: "+"<name>=";
+                       ToLog.warning(ErrorHandler.class,text);
+               }
+               return key;
+       }
+       
+       /**
+        * 
+        * @param value is a String representation of an integer
+        * @return the value as an Integer
+        */
+       private Integer parseValue(String value) {
+               int iVal = 0;
+               if(value != null) {
+                       try {
+                               iVal = Integer.parseInt(value);
+                       }
+                       catch(Exception e) {
+                               String text = "illegal argument: "+value;
+                               ToLog.warning(ErrorHandler.class,text);
+                       }
+               }
+               else {
+                       String text = "missing argument: "+"=<value>";
+                       ToLog.warning(ErrorHandler.class,text);
+               }
+               return iVal;
+       }
+       
+       public Integer getInteger(Key key) {
+               return map.get(key);
+       }
+       
+       // The below methods are for debugging and are nominally silent
+       
+       private void dumpArgs() {
+               String text = "args: "+"none";
+               ToLog.debug(ErrorHandler.class,text);
+       }
+       
+       public void dumpArgs(String args) {
+               if(args == null) {
+                       String text = "argString: "+"null";
+                       ToLog.debug(ErrorHandler.class,text);
+               }
+               else {
+                       String text = "argString: "+args;
+                       ToLog.debug(ErrorHandler.class,text);
+               }
+       }
+       
+       public void dumpArgs(String[] args) {
+               if(args == null) {
+                       String text = "args: "+"null";
+                       ToLog.debug(ErrorHandler.class,text);
+               }
+               else {
+                       StringBuffer sb = new StringBuffer();
+                       for(String arg : args) {
+                               sb.append(arg);
+                               sb.append(" ");
+                       }
+                       String text = "args: "+sb.toString().trim();
+                       ToLog.debug(ErrorHandler.class,text);
+               }
+       }
+       
+       private void dumpMap() {
+               for(Entry<Key, Integer> entry : map.entrySet()) {
+                       String key = entry.getKey().name();
+                       Integer value = entry.getValue();
+                       String text = key+"="+value;
+                       ToLog.debug(ErrorHandler.class,text);
+               }
+       }
+}

Propchange: 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/logger/ToLog.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/logger/ToLog.java?rev=1740644&r1=1740643&r2=1740644&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/logger/ToLog.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/logger/ToLog.java
 Sat Apr 23 10:46:49 2016
@@ -75,6 +75,8 @@ public class ToLog {
                return logger;
        }
        
+       // Note: close to insure that lock file is erased
+       
        private static void close(Logger logger) {
                Handler[] handlers = logger.getHandlers();
                if(handlers != null) {
@@ -85,7 +87,7 @@ public class ToLog {
        }
        
        /**
-        * Write a String message into ErrorHandler.log file
+        * Write an "info"  String message into ErrorHandler.log file
         */
        public static void info(Class<?> clazz, String text) {
                if(clazz != null) {
@@ -100,14 +102,44 @@ public class ToLog {
        }
        
        /**
+        * Write a "debug" String message into ErrorHandler.log file
+        */
+       public static void debug(Class<?> clazz, String text) {
+               if(clazz != null) {
+                       if(text != null) {
+                               Logger logger = getLogger(clazz);
+                               if(logger != null) {
+                                       logger.log(Level.FINE, text);
+                                       close(logger);
+                               }
+                       }
+               }
+       }
+       
+       /**
+        * Write a "warning" String message into ErrorHandler.log file
+        */
+       public static void warning(Class<?> clazz, String text) {
+               if(clazz != null) {
+                       if(text != null) {
+                               Logger logger = getLogger(clazz);
+                               if(logger != null) {
+                                       logger.log(Level.WARNING, text);
+                                       close(logger);
+                               }
+                       }
+               }
+       }
+       
+       /**
         * Write a Throwable message into ErrorHandler.log file
         */
-       public static void info(Class<?> clazz, Throwable t) {
+       public static void warning(Class<?> clazz, Throwable t) {
                if(clazz != null) {
                        if(t != null) {
                                Logger logger = getLogger(clazz);
                                if(logger != null) {
-                                       logger.log(Level.INFO, t.getMessage(), 
t);
+                                       logger.log(Level.WARNING, 
t.getMessage(), t);
                                        close(logger);
                                }
                        }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/test/java/org/apache/uima/ducc/user/jd/test/TestSuite.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/test/java/org/apache/uima/ducc/user/jd/test/TestSuite.java?rev=1740644&r1=1740643&r2=1740644&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/test/java/org/apache/uima/ducc/user/jd/test/TestSuite.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/test/java/org/apache/uima/ducc/user/jd/test/TestSuite.java
 Sat Apr 23 10:46:49 2016
@@ -31,7 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.uima.ducc.ErrorHandler;
-import org.apache.uima.ducc.ErrorHandler.InitializationDataKey;
+import org.apache.uima.ducc.ErrorHandlerProgrammability;
 import org.apache.uima.ducc.IErrorHandler;
 import org.apache.uima.ducc.IErrorHandlerDirective;
 import org.apache.uima.ducc.user.common.ExceptionHelper;
@@ -280,7 +280,7 @@ public class TestSuite {
                        assertTrue(directive.isKillWorkItem() == true);
                        //
                        limit = 10;
-                       plist = 
InitializationDataKey.KillJobLimit.name()+"="+limit;
+                       plist = 
ErrorHandlerProgrammability.Key.max_job_errors.name()+"="+limit;
                        eh = new ErrorHandler(plist);
                        directive = eh.handle(serializedCAS, 
getUserException());
                        for(int i=1; i<limit; i++) {
@@ -295,7 +295,7 @@ public class TestSuite {
                        assertTrue(directive.isKillWorkItem() == true);
                        //
                        limit = 20;
-                       plist = 
InitializationDataKey.KillJobLimit.name()+"="+limit;
+                       plist = 
ErrorHandlerProgrammability.Key.max_job_errors.name()+"="+limit;
                        eh = new ErrorHandler(plist);
                        directive = eh.handle(serializedCAS, 
getUserException());
                        for(int i=1; i<limit; i++) {


Reply via email to