Author: degenaro
Date: Thu Aug 18 13:51:36 2016
New Revision: 1756787
URL: http://svn.apache.org/viewvc?rev=1756787&view=rev
Log:
UIMA-5060 DUCC Orchestrator (OR) "hot" restart issues
- Don't record "raw" communication exception in JD log, but replace with
"reporting stopped" and "reporting "resumed".
- Minor code re-org for tidiness and understandability.
Modified:
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverStateExchanger.java
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/dispatcher/BaseHttpDispatcher.java
Modified:
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverStateExchanger.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverStateExchanger.java?rev=1756787&r1=1756786&r2=1756787&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverStateExchanger.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverStateExchanger.java
Thu Aug 18 13:51:36 2016
@@ -65,6 +65,7 @@ public class JobDriverStateExchanger ext
private long lastTime = System.currentTimeMillis();
private boolean die = false;
+ private boolean communications_ok = true;
private AtomicInteger getStateReqNo = new AtomicInteger(0);
@@ -141,18 +142,19 @@ public class JobDriverStateExchanger ext
jdc = value;
}
- public void setProcessMap(IDuccProcessMap value) {
+ private void setProcessMap(IDuccProcessMap value) {
dpMap = new DuccProcessMap(value);
}
- public JdReplyEvent request(JdRequestEvent jdRequestEvent) {
+ private JdReplyEvent request(JdRequestEvent jdRequestEvent) throws
Exception {
String location = "request";
JdReplyEvent jdReplyEvent = null;
try {
jdReplyEvent = (JdReplyEvent)
dispatcher.dispatchAndWaitForDuccReply(jdRequestEvent);
}
catch (Exception e) {
- logger.error(location, jobid, e);
+ logger.trace(location, jobid, e);
+ throw e;
}
return jdReplyEvent;
}
@@ -284,42 +286,64 @@ public class JobDriverStateExchanger ext
private boolean isTime() {
String location = "isTime";
boolean retVal = true;
- long currTime = System.currentTimeMillis();
- long elapsedTime = currTime - lastTime;
- logger.debug(location, jobid, "elapsedTime: "+elapsedTime);
- if(elapsedTime < wakeUpMillis) {
- retVal = false;
- sleepTime = wakeUpMillis - elapsedTime;
- }
- else {
- lastTime = currTime;
- sleepTime = wakeUpMillis;
+ try {
+ long currTime = System.currentTimeMillis();
+ long elapsedTime = currTime - lastTime;
+ logger.debug(location, jobid, "elapsedTime:
"+elapsedTime);
+ if(elapsedTime < wakeUpMillis) {
+ retVal = false;
+ sleepTime = wakeUpMillis - elapsedTime;
+ }
+ else {
+ lastTime = currTime;
+ sleepTime = wakeUpMillis;
+ }
+ }
+ catch(Exception e) {
+ logger.error(location, jobid, e);
}
return retVal;
}
+ private void exchange() {
+ String location = "exchange";
+ try {
+ JdRequestEvent jdRequestEvent = getJdRequestEvent();
+ JdReplyEvent jdReplyEvent = request(jdRequestEvent);
+ handle(jdReplyEvent);
+ if(!communications_ok) {
+ logger.warn(location, jobid, "Status reporting
resumed.");
+ communications_ok = true;
+ }
+ }
+ catch(Exception e) {
+ if(communications_ok) {
+ logger.warn(location, jobid, "Status reporting
stopped. Condition may be temporary.");
+ communications_ok = false;
+ }
+ }
+ }
+
+ private void wait_a_while() {
+ String location = "wait_a_while";
+ try {
+ logger.debug(location, jobid, "sleep "+sleepTime/1000);
+ Thread.sleep(sleepTime);
+ }
+ catch(Exception e) {
+ logger.trace(location, jobid, e);
+ }
+ }
+
public void run() {
String location = "run";
- logger.debug(location, jobid, "begin");
+ logger.trace(location, jobid, "begin");
while(!die) {
- try {
- if(isTime()) {
- JdRequestEvent jdRequestEvent =
getJdRequestEvent();
- JdReplyEvent jdReplyEvent =
request(jdRequestEvent);
- handle(jdReplyEvent);
- }
- }
- catch(Exception e) {
- logger.error(location, jobid, e);
- }
- try {
- logger.debug(location, jobid, "sleep
"+sleepTime/1000);
- Thread.sleep(sleepTime);
- }
- catch(Exception e) {
- logger.error(location, jobid, e);
+ if(isTime()) {
+ exchange();
}
+ wait_a_while();
}
- logger.debug(location, jobid, "end");
+ logger.trace(location, jobid, "end");
}
}
Modified:
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/dispatcher/BaseHttpDispatcher.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/dispatcher/BaseHttpDispatcher.java?rev=1756787&r1=1756786&r2=1756787&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/dispatcher/BaseHttpDispatcher.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/dispatcher/BaseHttpDispatcher.java
Thu Aug 18 13:51:36 2016
@@ -140,7 +140,15 @@ public abstract class BaseHttpDispatcher
String response = dispatch(serBody, "text/xml");
return (DuccEvent) fromXml(response);
} catch ( Throwable t ) {
- t.printStackTrace();
+ // Do not print stack trace when subject message event is
JD-STATE.
+ // Instead, simply re-throw the exception for the caller to
handle.
+ switch(duccEvent.getEventType()) {
+ case JD_STATE:
+ throw t;
+ default:
+ t.printStackTrace();
+ break;
+ }
}
return null;
}