Author: degenaro
Date: Thu Aug 18 13:51:36 2016
New Revision: 1756787

URL: http://svn.apache.org/viewvc?rev=1756787&view=rev
Log:
UIMA-5060 DUCC Orchestrator (OR) "hot" restart issues

- Don't record "raw" communication exception in JD log, but replace with 
"reporting stopped" and "reporting "resumed".
- Minor code re-org for tidiness and understandability.

Modified:
    
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverStateExchanger.java
    
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/dispatcher/BaseHttpDispatcher.java

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverStateExchanger.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverStateExchanger.java?rev=1756787&r1=1756786&r2=1756787&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverStateExchanger.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverStateExchanger.java
 Thu Aug 18 13:51:36 2016
@@ -65,6 +65,7 @@ public class JobDriverStateExchanger ext
        private long lastTime = System.currentTimeMillis();
        
        private boolean die = false;
+       private boolean communications_ok = true;
        
        private AtomicInteger getStateReqNo = new AtomicInteger(0);
        
@@ -141,18 +142,19 @@ public class JobDriverStateExchanger ext
                jdc = value;
        }
        
-       public void setProcessMap(IDuccProcessMap value) {
+       private void setProcessMap(IDuccProcessMap value) {
                dpMap = new DuccProcessMap(value);
        }
        
-       public JdReplyEvent request(JdRequestEvent jdRequestEvent) {
+       private JdReplyEvent request(JdRequestEvent jdRequestEvent) throws 
Exception {
                String location = "request";
                JdReplyEvent jdReplyEvent = null;
                try {
                        jdReplyEvent = (JdReplyEvent) 
dispatcher.dispatchAndWaitForDuccReply(jdRequestEvent);
                } 
                catch (Exception e) {
-                       logger.error(location, jobid, e);
+                       logger.trace(location, jobid, e);
+                       throw e;
                }
                return jdReplyEvent;
        }
@@ -284,42 +286,64 @@ public class JobDriverStateExchanger ext
        private boolean isTime() {
                String location = "isTime";
                boolean retVal = true;
-               long currTime = System.currentTimeMillis();
-               long elapsedTime = currTime - lastTime;
-               logger.debug(location, jobid, "elapsedTime: "+elapsedTime);
-               if(elapsedTime < wakeUpMillis) {
-                       retVal = false;
-                       sleepTime = wakeUpMillis - elapsedTime;
-               }
-               else {
-                       lastTime = currTime;
-                       sleepTime = wakeUpMillis;
+               try {
+                       long currTime = System.currentTimeMillis();
+                       long elapsedTime = currTime - lastTime;
+                       logger.debug(location, jobid, "elapsedTime: 
"+elapsedTime);
+                       if(elapsedTime < wakeUpMillis) {
+                               retVal = false;
+                               sleepTime = wakeUpMillis - elapsedTime;
+                       }
+                       else {
+                               lastTime = currTime;
+                               sleepTime = wakeUpMillis;
+                       }
+               }
+               catch(Exception e) {
+                       logger.error(location, jobid, e);
                }
                return retVal;
        }
        
+       private void exchange() {
+               String location = "exchange";
+               try {
+                       JdRequestEvent jdRequestEvent = getJdRequestEvent();
+                       JdReplyEvent jdReplyEvent = request(jdRequestEvent);
+                       handle(jdReplyEvent);
+                       if(!communications_ok) {
+                               logger.warn(location, jobid, "Status reporting 
resumed.");
+                               communications_ok = true;
+                       }
+               }
+               catch(Exception e) {
+                       if(communications_ok) {
+                               logger.warn(location, jobid, "Status reporting 
stopped.  Condition may be temporary.");
+                               communications_ok = false;
+                       }
+               }
+       }
+       
+       private void wait_a_while() {
+               String location = "wait_a_while";
+               try {
+                       logger.debug(location, jobid, "sleep "+sleepTime/1000);
+                       Thread.sleep(sleepTime);
+               }
+               catch(Exception e) {
+                       logger.trace(location, jobid, e);
+               }
+       }
+       
        public void run() {
                String location = "run";
-               logger.debug(location, jobid, "begin");
+               logger.trace(location, jobid, "begin");
                while(!die) {
-                       try {
-                               if(isTime()) {
-                                       JdRequestEvent jdRequestEvent = 
getJdRequestEvent();
-                                       JdReplyEvent jdReplyEvent = 
request(jdRequestEvent);
-                                       handle(jdReplyEvent);
-                               }
-                       }
-                       catch(Exception e) {
-                               logger.error(location, jobid, e);
-                       }
-                       try {
-                               logger.debug(location, jobid, "sleep 
"+sleepTime/1000);
-                               Thread.sleep(sleepTime);
-                       }
-                       catch(Exception e) {
-                               logger.error(location, jobid, e);
+                       if(isTime()) {
+                               exchange();
                        }
+                       wait_a_while();
                }
-               logger.debug(location, jobid, "end");
+               logger.trace(location, jobid, "end");
        }
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/dispatcher/BaseHttpDispatcher.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/dispatcher/BaseHttpDispatcher.java?rev=1756787&r1=1756786&r2=1756787&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/dispatcher/BaseHttpDispatcher.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/dispatcher/BaseHttpDispatcher.java
 Thu Aug 18 13:51:36 2016
@@ -140,7 +140,15 @@ public abstract class BaseHttpDispatcher
             String response =  dispatch(serBody, "text/xml");
             return (DuccEvent) fromXml(response);
        } catch ( Throwable t ) { 
-            t.printStackTrace(); 
+               // Do not print stack trace when subject message event is 
JD-STATE.
+               // Instead, simply re-throw the exception for the caller to 
handle.
+               switch(duccEvent.getEventType()) {
+               case JD_STATE:
+                       throw t;
+               default:
+                       t.printStackTrace(); 
+                       break;
+               }
         }
         return null;
     }


Reply via email to