Author: degenaro
Date: Sat Apr 23 10:46:49 2016
New Revision: 1740644
URL: http://svn.apache.org/viewvc?rev=1740644&view=rev
Log:
UIMA-4902 DUCC Job Driver (JD) add programmability feature to built-in error
handler
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
(with props)
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/logger/ToLog.java
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/test/java/org/apache/uima/ducc/user/jd/test/TestSuite.java
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandler.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandler.java?rev=1740644&r1=1740643&r2=1740644&view=diff
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandler.java
(original)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandler.java
Sat Apr 23 10:46:49 2016
@@ -18,41 +18,18 @@
*/
package org.apache.uima.ducc;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.uima.ducc.ErrorHandlerProgrammability.Key;
import org.apache.uima.ducc.logger.ToLog;
-import org.apache.uima.ducc.user.common.QuotedOptions;
import org.apache.uima.ducc.user.error.iface.Transformer;
-import org.apache.uima.ducc.user.jd.JdUser;
public class ErrorHandler implements IErrorHandler {
- /**
- * These are the System Properties nominally specified in the
- * user's job submission (as -D's for driver_jvm_args) which will
- * be considered for adjusting runtime operational characteristics.
- */
- public static enum SystemPropertyNames {
- JobDriverErrorHandlerMaximumNumberOfTimeoutRetrysPerWorkItem,
- }
-
- /**
- * Return a directive with isKillWorkItem() == false unless and until
- * the maximumNumberOfTimeoutRetrysPerWorkItem is exceeded.
- */
- private AtomicInteger maximumNumberOfTimeoutRetrysPerWorkItem = new
AtomicInteger(0);
-
- /**
- * Flag to insure initialization occurs exactly once.
- */
- private AtomicBoolean alreadyInitialized = new AtomicBoolean(false);
+ private String initializationData = null;
+ private ErrorHandlerProgrammability ehp = null;
/**
* A map comprising an entry for each work item with a corresponding
count
@@ -60,143 +37,42 @@ public class ErrorHandler implements IEr
*/
private ConcurrentHashMap<String,AtomicLong> retryMap = new
ConcurrentHashMap<String,AtomicLong>();
- public enum InitializationDataKey {
- KillJobLimit("max_job_errors"),
- ;
-
- private String altname = null;
-
- private InitializationDataKey() {
- altname = name();
- }
-
- private InitializationDataKey(String value) {
- altname = value;
- }
-
- public String altname() {
- return altname;
- }
- };
-
- private static int DefaultJobErrorLimit = JdUser.DefaultJobErrorLimit;
-
- private AtomicInteger jobErrorLimit = new
AtomicInteger(DefaultJobErrorLimit);
-
+ /**
+ * The number of work item errors encountered for the job
+ */
private AtomicInteger jobErrorCount = new AtomicInteger(0);
public ErrorHandler() {
}
public ErrorHandler(String initializationData) {
- initialize(initializationData);
- }
-
- /**
- * Insure we initialize exactly once
- */
- private void initializeOnce() {
- synchronized(ErrorHandler.class) {
- if(!alreadyInitialized.get()) {
- Properties systemProperties =
System.getProperties();
- initTimeoutRetrys(systemProperties);
- alreadyInitialized.set(true);
- }
- }
+ setInitializationData(initializationData);
}
- /**
- * Use the user specified
-DJobDriverErrorHandlerMaximumNumberOfTimeoutRetrysPerWorkItem to set
- * max timeouts per work item, if specified otherwise 0 meaning no
retrys
- */
- private void initTimeoutRetrys(Properties systemProperties) {
- String key =
SystemPropertyNames.JobDriverErrorHandlerMaximumNumberOfTimeoutRetrysPerWorkItem.name();
- if(systemProperties != null) {
- if(systemProperties.containsKey(key)) {
- String value =
systemProperties.getProperty(key);
- try {
- int integer = Integer.parseInt(value);
- if(integer < 0) {
- String text = "Invalid:
"+key+"="+value;
-
ToLog.info(ErrorHandler.class,text);
- }
- else {
-
maximumNumberOfTimeoutRetrysPerWorkItem.set(integer);;
- String text = "Override:
"+key+"="+maximumNumberOfTimeoutRetrysPerWorkItem.get();
-
ToLog.info(ErrorHandler.class,text);
- }
- }
- catch(Exception e) {
- String text = "Invalid: "+key+"="+value;
- ToLog.info(ErrorHandler.class,text);
- }
- }
- else {
- String text = "Default:
"+key+"="+maximumNumberOfTimeoutRetrysPerWorkItem.get();
- ToLog.info(ErrorHandler.class,text);
- }
- }
- else {
- String text = "Default:
"+key+"="+maximumNumberOfTimeoutRetrysPerWorkItem.get();
- ToLog.info(ErrorHandler.class,text);
- }
+ private void setInitializationData(String value) {
+ initializationData = value;
}
- private Map<String, String> parse(String initializationData) {
- Map<String, String> map = new HashMap<String, String>();
- try {
- if(initializationData != null) {
- ArrayList<String> toks =
QuotedOptions.tokenizeList(initializationData, true);
- if(toks != null) {
- for(String tok : toks) {
- String[] split = tok.split("=");
- String key =
split[0].trim().toLowerCase();
- String value = split[1].trim();
- map.put(key, value);
- }
- }
- }
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- return map;
+ private String getInitializationData() {
+ return initializationData;
}
@Override
public void initialize(String initializationData) {
- if(initializationData != null) {
- Map<String, String> map = parse(initializationData);
- String key;
- key =
InitializationDataKey.KillJobLimit.name().toLowerCase();
- if(map.containsKey(key)) {
- String value = map.get(key);
- initKillJob(value);
- }
- else {
- String altkey =
InitializationDataKey.KillJobLimit.altname();
- if(map.containsKey(altkey)) {
- String value = map.get(altkey);
- initKillJob(value);
- }
- }
- }
- }
-
- private void initKillJob(String value) {
- try {
- int expect = DefaultJobErrorLimit;
- int update = Integer.parseInt(value);
- jobErrorLimit.compareAndSet(expect, update);
- }
- catch(Exception e) {
- e.printStackTrace();
- }
+ setInitializationData(initializationData);
}
@Override
public IErrorHandlerDirective handle(String serializedCAS, Object
object) {
- initializeOnce();
+ // Do not actually initialize until the first handle situation
+ // arises so as to not create the file ErrorHandler.log unless
+ // a work item error or timeout occurs.
+ synchronized(ErrorHandler.class) {
+ if(ehp == null) {
+ ehp = new
ErrorHandlerProgrammability(getInitializationData());
+ }
+ }
+ // Make ready a default directive for return
ErrorHandlerDirective jdUserDirective = new
ErrorHandlerDirective();
try {
Throwable userThrowable = null;
@@ -209,14 +85,16 @@ public class ErrorHandler implements IEr
userThrowable = (Throwable) object;
userThrowable.getClass();
ToLog.info(ErrorHandler.class,
serializedCAS);
- ToLog.info(ErrorHandler.class,
userThrowable);
+ ToLog.warning(ErrorHandler.class,
userThrowable);
if(serializedCAS != null) {
retryMap.putIfAbsent(serializedCAS, new AtomicLong(0));
AtomicLong retryCount =
retryMap.get(serializedCAS);
long count =
retryCount.incrementAndGet();
- if(count <=
maximumNumberOfTimeoutRetrysPerWorkItem.get()) {
+ Integer
max_timeout_retrys_per_workitem =
ehp.getInteger(Key.max_timeout_retrys_per_workitem);
+ // don't kill work item if
still eligible for timeout retry
+ if(count <=
max_timeout_retrys_per_workitem) {
jdUserDirective.resetKillWorkItem();
- String text = "retry #
"+count+" of "+maximumNumberOfTimeoutRetrysPerWorkItem.get()+" for:
"+serializedCAS;
+ String text = "retry #
"+count+" of "+max_timeout_retrys_per_workitem+" for: "+serializedCAS;
ToLog.info(ErrorHandler.class,text);
}
else {
@@ -238,13 +116,16 @@ public class ErrorHandler implements IEr
else {
jobErrorCount.incrementAndGet();
}
- if(jobErrorCount.get() > jobErrorLimit.get()) {
+ Integer max_job_errors =
ehp.getInteger(Key.max_job_errors);
+ // kill job if max errors limit is surpassed
+ if(jobErrorCount.get() > max_job_errors) {
jdUserDirective.setKillJob();
}
}
catch(Exception e) {
e.printStackTrace();
}
+ // record results in ErrorHandler.log
StringBuffer sb = new StringBuffer();
sb.append("KillJob: ");
sb.append(jdUserDirective.isKillJob());
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java?rev=1740644&view=auto
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
(added)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
Sat Apr 23 10:46:49 2016
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.logger.ToLog;
+
+public class ErrorHandlerProgrammability {
+
+ /**
+ * This class interprets the <string> specified for
+ * --driver_exception_handler_arguments of the ducc_submit command.
+ */
+
+ private Map<Key,Integer> map = new ConcurrentHashMap<Key,Integer>();
+
+ /**
+ * Supported keywords for
+ * ducc_submit --driver_exception_handler_arguments <string>
+ */
+ public enum Key {
+ max_job_errors,
+ max_timeout_retrys_per_workitem,
+ ;
+ }
+
+ /**
+ * Initialize with defaults
+ */
+ {
+ map.put(Key.max_job_errors, new Integer(15));
+ map.put(Key.max_timeout_retrys_per_workitem, new Integer(0));
+ }
+
+ public ErrorHandlerProgrammability() {
+ dumpArgs();
+ dumpMap();
+ }
+
+ /**
+ * @param argString is a blank-delimited String of key=value pairs
+ * with no embedded blanks. Example: keyA=valA keyB=valB...
+ */
+
+ public ErrorHandlerProgrammability(String argString) {
+ dumpArgs(argString);
+ String[] args = toArgs(argString);
+ override(args);
+ dumpMap();
+ }
+
+ private String[] toArgs(String argString) {
+ String[] list = null;
+ if(argString != null) {
+ list = argString.split("\\s+");
+ }
+ return list;
+ }
+
+ /**
+ * @param args is an array of key=value Strings
+ */
+
+ public ErrorHandlerProgrammability(String[] args) {
+ dumpArgs(args);
+ override(args);
+ dumpMap();
+ }
+
+ /**
+ * @param args is an array of key=value Strings
+ *
+ * Replace the default values in the map with those specified
+ * by the passed in args
+ */
+ private void override(String[] args) {
+ if(args != null) {
+ for(String arg : args) {
+ String[] kvp = arg.split("=");
+ if(kvp.length == 2) {
+ Key key = parseKey(kvp[0]);
+ Integer value = parseValue(kvp[1]);
+ map.put(key,value);
+ String text = "override:
"+key.name()+"="+value;
+ ToLog.info(ErrorHandler.class,text);
+ }
+ else {
+ String text = "illegal argument: "+arg;
+ ToLog.warning(ErrorHandler.class,text);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param name is one of the expected String
+ * @return Key is the corresponding enum
+ */
+
+ private Key parseKey(String name) {
+ Key key = null;
+ if(name != null) {
+ for(Key k : Key.values()) {
+ if(name.equals(k.name())) {
+ key = k;
+ break;
+ }
+ }
+ if(key == null) {
+ String text = "illegal argument: "+name;
+ ToLog.warning(ErrorHandler.class,text);
+ }
+ }
+ else {
+ String text = "missing argument: "+"<name>=";
+ ToLog.warning(ErrorHandler.class,text);
+ }
+ return key;
+ }
+
+ /**
+ *
+ * @param value is a String representation of an integer
+ * @return the value as an Integer
+ */
+ private Integer parseValue(String value) {
+ int iVal = 0;
+ if(value != null) {
+ try {
+ iVal = Integer.parseInt(value);
+ }
+ catch(Exception e) {
+ String text = "illegal argument: "+value;
+ ToLog.warning(ErrorHandler.class,text);
+ }
+ }
+ else {
+ String text = "missing argument: "+"=<value>";
+ ToLog.warning(ErrorHandler.class,text);
+ }
+ return iVal;
+ }
+
+ public Integer getInteger(Key key) {
+ return map.get(key);
+ }
+
+ // The below methods are for debugging and are nominally silent
+
+ private void dumpArgs() {
+ String text = "args: "+"none";
+ ToLog.debug(ErrorHandler.class,text);
+ }
+
+ public void dumpArgs(String args) {
+ if(args == null) {
+ String text = "argString: "+"null";
+ ToLog.debug(ErrorHandler.class,text);
+ }
+ else {
+ String text = "argString: "+args;
+ ToLog.debug(ErrorHandler.class,text);
+ }
+ }
+
+ public void dumpArgs(String[] args) {
+ if(args == null) {
+ String text = "args: "+"null";
+ ToLog.debug(ErrorHandler.class,text);
+ }
+ else {
+ StringBuffer sb = new StringBuffer();
+ for(String arg : args) {
+ sb.append(arg);
+ sb.append(" ");
+ }
+ String text = "args: "+sb.toString().trim();
+ ToLog.debug(ErrorHandler.class,text);
+ }
+ }
+
+ private void dumpMap() {
+ for(Entry<Key, Integer> entry : map.entrySet()) {
+ String key = entry.getKey().name();
+ Integer value = entry.getValue();
+ String text = key+"="+value;
+ ToLog.debug(ErrorHandler.class,text);
+ }
+ }
+}
Propchange:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/ErrorHandlerProgrammability.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/logger/ToLog.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/logger/ToLog.java?rev=1740644&r1=1740643&r2=1740644&view=diff
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/logger/ToLog.java
(original)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/logger/ToLog.java
Sat Apr 23 10:46:49 2016
@@ -75,6 +75,8 @@ public class ToLog {
return logger;
}
+ // Note: close to insure that lock file is erased
+
private static void close(Logger logger) {
Handler[] handlers = logger.getHandlers();
if(handlers != null) {
@@ -85,7 +87,7 @@ public class ToLog {
}
/**
- * Write a String message into ErrorHandler.log file
+ * Write an "info" String message into ErrorHandler.log file
*/
public static void info(Class<?> clazz, String text) {
if(clazz != null) {
@@ -100,14 +102,44 @@ public class ToLog {
}
/**
+ * Write a "debug" String message into ErrorHandler.log file
+ */
+ public static void debug(Class<?> clazz, String text) {
+ if(clazz != null) {
+ if(text != null) {
+ Logger logger = getLogger(clazz);
+ if(logger != null) {
+ logger.log(Level.FINE, text);
+ close(logger);
+ }
+ }
+ }
+ }
+
+ /**
+ * Write a "warning" String message into ErrorHandler.log file
+ */
+ public static void warning(Class<?> clazz, String text) {
+ if(clazz != null) {
+ if(text != null) {
+ Logger logger = getLogger(clazz);
+ if(logger != null) {
+ logger.log(Level.WARNING, text);
+ close(logger);
+ }
+ }
+ }
+ }
+
+ /**
* Write a Throwable message into ErrorHandler.log file
*/
- public static void info(Class<?> clazz, Throwable t) {
+ public static void warning(Class<?> clazz, Throwable t) {
if(clazz != null) {
if(t != null) {
Logger logger = getLogger(clazz);
if(logger != null) {
- logger.log(Level.INFO, t.getMessage(),
t);
+ logger.log(Level.WARNING,
t.getMessage(), t);
close(logger);
}
}
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/test/java/org/apache/uima/ducc/user/jd/test/TestSuite.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/test/java/org/apache/uima/ducc/user/jd/test/TestSuite.java?rev=1740644&r1=1740643&r2=1740644&view=diff
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/test/java/org/apache/uima/ducc/user/jd/test/TestSuite.java
(original)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/test/java/org/apache/uima/ducc/user/jd/test/TestSuite.java
Sat Apr 23 10:46:49 2016
@@ -31,7 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.uima.ducc.ErrorHandler;
-import org.apache.uima.ducc.ErrorHandler.InitializationDataKey;
+import org.apache.uima.ducc.ErrorHandlerProgrammability;
import org.apache.uima.ducc.IErrorHandler;
import org.apache.uima.ducc.IErrorHandlerDirective;
import org.apache.uima.ducc.user.common.ExceptionHelper;
@@ -280,7 +280,7 @@ public class TestSuite {
assertTrue(directive.isKillWorkItem() == true);
//
limit = 10;
- plist =
InitializationDataKey.KillJobLimit.name()+"="+limit;
+ plist =
ErrorHandlerProgrammability.Key.max_job_errors.name()+"="+limit;
eh = new ErrorHandler(plist);
directive = eh.handle(serializedCAS,
getUserException());
for(int i=1; i<limit; i++) {
@@ -295,7 +295,7 @@ public class TestSuite {
assertTrue(directive.isKillWorkItem() == true);
//
limit = 20;
- plist =
InitializationDataKey.KillJobLimit.name()+"="+limit;
+ plist =
ErrorHandlerProgrammability.Key.max_job_errors.name()+"="+limit;
eh = new ErrorHandler(plist);
directive = eh.handle(serializedCAS,
getUserException());
for(int i=1; i<limit; i++) {