Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
 Wed May  1 13:42:39 2019
@@ -1,20 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+/*                                                                             
                                                                                
                                      
+ * Licensed to the Apache Software Foundation (ASF) under one                  
                                                                                
                                      
+ * or more contributor license agreements.  See the NOTICE file                
                                                                                
                                      
+ * distributed with this work for additional information                       
                                                                                
                                      
+ * regarding copyright ownership.  The ASF licenses this file                  
                                                                                
                                      
+ * to you under the Apache License, Version 2.0 (the                           
                                                                                
                                      
+ * "License"); you may not use this file except in compliance                  
                                                                                
                                      
+ * with the License.  You may obtain a copy of the License at                  
                                                                                
                                      
+ *                                                                             
                                                                                
                                      
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
                                                                                
                                      
+ *                                                                             
                                                                                
                                      
+ * Unless required by applicable law or agreed to in writing,                  
                                                                                
                                      
+ * software distributed under the License is distributed on an                 
                                                                                
                                      
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY                      
                                                                                
                                      
+ * KIND, either express or implied.  See the License for the                   
                                                                                
                                      
+ * specific language governing permissions and limitations                     
                                                                                
                                      
+ * under the License.                                                          
                                                                                
                                      
 */
 package org.apache.uima.ducc.ps.service.transport.http;
 
@@ -30,6 +30,7 @@ import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.http.HttpEntity;
@@ -61,358 +62,408 @@ import org.apache.uima.ducc.ps.service.u
 import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
 
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
 public class HttpServiceTransport implements IServiceTransport {
-       private Logger logger =  
UIMAFramework.getLogger(HttpServiceTransport.class);
-       private HttpClient httpClient = null;
-       private PoolingHttpClientConnectionManager cMgr = null;
-       private int clientMaxConnections = 1;
-       private int clientMaxConnectionsPerRoute = 1;
-       private int clientMaxConnectionsPerHostPort = 0;
-       private ReentrantLock registryLookupLock = new ReentrantLock();
-    private long threadSleepTime=1000; // millis
-    private final String nodeIP;
-    private final String nodeName;
-    private final String pid;          
-    private ITargetURI currentTargetUrl = new NoOpTargetURI();
-    private static final  String NA="N/A";
-    private TransportStats stats = new TransportStats();
-    private IRegistryClient registryClient;
-    // holds reference to HttpPost object for every thread. Key=thread id
-    private Map<Long,HttpPost> httpPostMap = 
-               new HashMap<>();
-    private volatile boolean stopping = false;
-       private volatile boolean running = false;
-       private volatile boolean log = true;
-       
-       public HttpServiceTransport(IRegistryClient registryClient, int 
scaleout) throws ServiceException {
-               this.registryClient = registryClient;
-               clientMaxConnections = scaleout;
-
-               
-               if ( Objects.isNull(System.getenv("DUCC_IP")) || 
Objects.isNull(System.getenv("DUCC_NODENAME"))) {
-                       try {
-                               nodeIP = 
InetAddress.getLocalHost().getHostAddress();
-                               
nodeName=InetAddress.getLocalHost().getCanonicalHostName();
-                               
-                       } catch( UnknownHostException e) {
-                               throw new RuntimeException(new 
TransportException("HttpServiceTransport.ctor - Unable to determine Host Name 
and IP",e));
-                       }
-                       
-               } else {
-                       // Use agent provided node identity. This is important 
when running in a sim mode
-                       // where nodes are virtual.
-                       nodeIP =  System.getenv("DUCC_IP");
-                       nodeName = System.getenv("DUCC_NODENAME");
-
-               }
-               pid = getProcessIP(NA);
-       }
-       private HttpPost getPostMethodForCurrentThread() {
-               HttpPost postMethod;
-               if ( !httpPostMap.containsKey(Thread.currentThread().getId())) {
-                       // each thread needs its own PostMethod
-                       postMethod =
-                           new HttpPost(currentTargetUrl.asString());
-                       
httpPostMap.put(Thread.currentThread().getId(),postMethod);
-               } else {
-                       postMethod = 
httpPostMap.get(Thread.currentThread().getId());
-               }
-               return postMethod;
-       }
-       private String getProcessIP(final String fallback) {
-               // the following code returns '<pid>@<hostname>'
-               String name = ManagementFactory.getRuntimeMXBean().getName();
-               int pos = name.indexOf('@');
-
-               if (pos < 1) {
-                       // pid not found
-                       return fallback;
-               }
-
-               try {
-                       return Long.toString(Long.parseLong(name.substring(0, 
pos)));
-               } catch (NumberFormatException e) {
-                       // ignore
-               }
-               return fallback;
-       }
-       private void lookupNewTarget() {
-               registryLookupLock.lock();
-               while( !stopping ) {
-                       try {
-                               String newTarget = 
registryClient.lookUp(currentTargetUrl.asString());
-                               currentTargetUrl = 
TargetURIFactory.newTarget(newTarget);
-                               break;
-                       } catch(  Exception e) {
-                               synchronized (httpClient) {
-                                       
-                                       try {
-                                               
httpClient.wait(threadSleepTime);
-                                       } catch( InterruptedException ex) {
-                                               
Thread.currentThread().interrupt();
-                                               break;
-                                       }
-                               }
-                       }
-               }
-               if (registryLookupLock.isHeldByCurrentThread()) {
-                       registryLookupLock.unlock();
-               }
-       }
-       public void addRequestorInfo(IMetaTaskTransaction transaction) {
-       transaction.setRequesterAddress(nodeIP);
-       transaction.setRequesterNodeName(nodeName);
-       transaction.setRequesterProcessId(Integer.valueOf(pid));
-       transaction.setRequesterThreadId((int)Thread.currentThread().getId());
-       if ( logger.isLoggable(Level.FINE )) {
-               logger.log(Level.FINE,"ip:"+transaction.getRequesterAddress());
-               logger.log(Level.FINE, 
"nodeName:"+transaction.getRequesterNodeName());
-               logger.log(Level.FINE, 
"processName:"+transaction.getRequesterProcessName());
-               
logger.log(Level.FINE,"processId:"+transaction.getRequesterProcessId());
-               logger.log(Level.FINE, 
"threadId:"+transaction.getRequesterThreadId());
-
-       }
-               
-       }
-       public void initialize() throws ServiceInitializationException { 
-               
-               // use plugged in registry to lookup target to connect to.
-               // Sets global: currentTarget
-               lookupNewTarget();
-               
-               cMgr = new PoolingHttpClientConnectionManager();
-
-               if (clientMaxConnections > 0) {
-                       cMgr.setMaxTotal(clientMaxConnections);
-               }
-               // Set default max connections per route
-               if (clientMaxConnectionsPerRoute > 0) {
-                       
cMgr.setDefaultMaxPerRoute(clientMaxConnectionsPerRoute);
-               }
-               HttpHost httpHost = new HttpHost(currentTargetUrl.asString(), 
Integer.valueOf(currentTargetUrl.getPort()), currentTargetUrl.getContext());
-               if (clientMaxConnectionsPerHostPort > 0) {
-                       cMgr.setMaxPerRoute(new HttpRoute(httpHost), 
clientMaxConnectionsPerHostPort);
-               }
-               
-               httpClient = 
HttpClients.custom().setConnectionManager(cMgr).build();
-               running = true;
-
-       }
-    private void addCommonHeaders( HttpPost method ) {
-       synchronized( HttpServiceTransport.class ) {
-               
-                method.setHeader("IP", nodeIP);
-             method.setHeader("Hostname", nodeName);
-             method.setHeader("ThreadID",
-                             String.valueOf(Thread.currentThread().getId()));
-             method.setHeader("PID", pid);
-               
-       }
-               
-    }
-
-       private HttpEntity wrapRequest(String serializedRequest) {
-               return new StringEntity(serializedRequest, 
ContentType.APPLICATION_XML);
-       }
-
-       private boolean isRunning() {
-               return running;
-       }
-
-       private IMetaTaskTransaction retryUntilSuccessfull(String request, 
HttpPost postMethod) {
-               IMetaTaskTransaction response=null;
-
-               // retry until service is stopped
-               while (isRunning()) {
-                       try {
-                               response =  doPost(postMethod);
-                               break;
-
-                       } catch (TransportException | IOException | 
URISyntaxException exx) {
-                               try {
-                                       Thread.sleep(threadSleepTime);
-                               } catch( InterruptedException e) {
-                                       Thread.currentThread().interrupt();
-                               }
-                       }
-                       lookupNewTarget();
-                        
-               }
-               return response;
-               
-       }
-       private IMetaTaskTransaction doPost(HttpPost postMethod) throws 
URISyntaxException, IOException, TransportException {
-               postMethod.setURI(new URI(currentTargetUrl.asString()));
-
-               IMetaTaskTransaction metaTransaction=null;
-               HttpResponse response = httpClient.execute(postMethod);
-               //              if ( stopping ) {
-               //      throw new TransportException("Service stopping - 
rejecting request");
-               //}
-               HttpEntity entity = response.getEntity();
-               String serializedResponse = EntityUtils.toString(entity);
-               Object transaction=null;
-               try {
-                       transaction = 
XStreamUtils.unmarshall(serializedResponse);
-               } catch(Exception e) {
-                       logger.log(Level.WARNING,"Process 
Thread:"+Thread.currentThread().getId()+" Error while deserializing response 
with XStream",e);
-                       throw new TransportException(e);
-               }
-               if ( Objects.isNull(transaction)) {
-                       throw new InvalidClassException(
-                                       "Expected IMetaTaskTransaction - 
Instead Received NULL");
-
-               } else if ( !(transaction instanceof IMetaTaskTransaction) ) { 
-                       throw new InvalidClassException(
-                                       "Expected IMetaTaskTransaction - 
Instead Received " + transaction.getClass().getName());
-               }
-               metaTransaction = (IMetaTaskTransaction) transaction;
-               
-               StatusLine statusLine = response.getStatusLine();
-               if (statusLine.getStatusCode() != 200 ) {
-                       // all IOExceptions are retried
-                       throw new IOException(
-                                       "Unexpected HttpClient response 
status:"+statusLine+ " Content causing error:"+serializedResponse);
-               }
-
-               stats.incrementSuccessCount();
-               return metaTransaction;
-       }
-       /**
-        * Dispatches request to remote driver via doPost(). Its synchronized 
to prevent over-running the driver with
-        * requests from multiple threads. When the transport fails sending 
GET/ACK/END a single thread
-        * will try to recover connection and send the request.
-        * 
-        */
-       @Override
-       public synchronized IMetaTaskTransaction dispatch(String 
serializedRequest) throws TransportException  {
-           //if ( stopping ) {
-           //          throw new IllegalStateException("Service transport has 
been stopped, unable to dispatch request");
-           //  }
-               IMetaTaskTransaction transaction=null;
-               HttpEntity e = wrapRequest(serializedRequest);
-               // Each thread has its own HttpPost method. If current thread
-               // doesnt have one, it will be created and added to the local
-               // Map. Subsequent requests will fetch it from the map using
-               // current thread ID as a key.
-               HttpPost postMethod = getPostMethodForCurrentThread();
-               addCommonHeaders(postMethod);
-               postMethod.setEntity(e);
-               try {
-                       String simulatedException;
-                       // To test transport errors add command line option 
-DMockHttpPostError=exception where
-                       // exception is one of the following Strings:
-                       //
-                       // IOException, 
-                       // SocketException, 
-                       // UnknownHostException, 
-                       // NoRouteToHostException,
-                       // NoHttpResponseException, 
-                       // HttpHostConnectException, 
-                       // URISyntaxException
-                       // Use JUnit test JunitTransoirtTestCase to test the 
above errors
-                       
-                       if ( ( simulatedException = 
System.getProperty("MockHttpPostError")) != null ) {
-                               HttpClientExceptionGenerator 
mockExceptionGenerator = 
-                                               new 
HttpClientExceptionGenerator(simulatedException);
-                               
mockExceptionGenerator.throwSimulatedException();
-                       } else {
-                               transaction = doPost(postMethod);
-                       }
-               } catch( IOException | URISyntaxException ex) {
-                       if ( stopping ) {
-                               // looks like the process is in the shutdown 
mode. Log an exception and dont retry
-                               logger.log(Level.INFO,"Process 
Thread:"+Thread.currentThread().getId()+" - Process is already stopping - 
Caught Exception while calling doPost() \n"+ex);
-                               throw new TransportException(ex);
-                       } else {
-                               if ( log ) {
-                                       log = false;
-                                       stats.incrementErrorCount();
-                                       logger.log(Level.WARNING, 
this.getClass().getName()+".dispatch() >>>>>>>>>> Handling Exception \n"+ex);
-                                       logger.log(Level.INFO, ">>>>>>>>>> 
Unable to communicate with target:"+currentTargetUrl.asString()+" - retrying 
until successfull - with "+threadSleepTime/1000+" seconds wait between retries  
");
-                               }
-                               transaction = 
retryUntilSuccessfull(serializedRequest, postMethod);
-                               log = true;
-                               logger.log(Level.INFO, "Established connection 
to target:"+currentTargetUrl.asString());
-                       }
-
-
-               } finally {
-                       postMethod.releaseConnection();
-               }
-               return transaction;
-               
-       }
-       
-       public void stop(boolean quiesce) {
-
-               stopping = true;
-               running = false;
-               // Use System.out since the logger's ShutdownHook may have 
closed streams
-               System.out.println(Utils.getTimestamp()+">>>>>>> 
"+Utils.getShortClassname(this.getClass())+" stop() called - 
mode:"+(quiesce==true?"quiesce":"stop"));
-               logger.log(Level.INFO,this.getClass().getName()+" stop() 
called");
-               if ( !quiesce && cMgr != null ) {
-                       cMgr.shutdown();
-                       System.out.println(Utils.getTimestamp()+">>>>>>> 
"+Utils.getShortClassname(this.getClass())+" stopped connection mgr");
-                       logger.log(Level.INFO,this.getClass().getName()+" 
stopped connection mgr");
-
-               }
-       }
-       public static void main(String[] args) {
-
-       }
-
-       public static class HttpClientExceptionGenerator {
-               public enum ERROR{ IOException, SocketException, 
UnknownHostException, NoRouteToHostException,NoHttpResponseException, 
HttpHostConnectException, URISyntaxException};
-               
-               Exception exceptionClass=null;
-               
-               public HttpClientExceptionGenerator(String exc) {
-                       
-                       for( ERROR e : ERROR.values()) {
-                               if ( exc != null && e.name().equals(exc)) {
-                                       switch(e) {
-                                       case IOException:
-                                               exceptionClass = new 
IOException("Simulated IOException");
-                                               break;
-                                       case URISyntaxException:
-                                               exceptionClass = new 
URISyntaxException("", "Simulated URISyntaxException");
-                                               break;
-                                       case NoRouteToHostException:
-                                               exceptionClass = new 
NoRouteToHostException("Simulated NoRouteToHostException");
-                                               break;
-                                       case NoHttpResponseException:
-                                               exceptionClass = new 
NoHttpResponseException("Simulated NoHttpResponseException");
-                                               break;  
-                                       case SocketException:
-                                               exceptionClass = new 
SocketException("Simulated SocketException");
-                                               break;
-                                       case UnknownHostException:
-                                               exceptionClass = new 
UnknownHostException("Simulated UnknownHostException");
-                                               break;
-                                               
-                                       default:
-                                               
-                                                       
-                                       }
-                                       if ( exceptionClass != null ) {
-                                               break;
-                                       }
-                               }
-                       }
-               }
-               public void throwSimulatedException() throws IOException, 
URISyntaxException {
-                       if ( exceptionClass != null ) {
-                               if ( exceptionClass instanceof IOException ) {
-                                       throw (IOException)exceptionClass;
-                               } else if ( exceptionClass instanceof 
URISyntaxException ) {
-                                       throw 
(URISyntaxException)exceptionClass;
-                               }
-                               
-                       }
-               }
-               
-               
-       }
+  private Logger logger = UIMAFramework.getLogger(HttpServiceTransport.class);
+
+  private HttpClient httpClient = null;
+
+  private PoolingHttpClientConnectionManager cMgr = null;
+
+  private int clientMaxConnections = 1;
+
+  private int clientMaxConnectionsPerRoute = 1;
+
+  private int clientMaxConnectionsPerHostPort = 0;
+
+  private ReentrantLock registryLookupLock = new ReentrantLock();
+
+  private long threadSleepTime = 1000; // millis
+
+  private final String nodeIP;
+
+  private final String nodeName;
+
+  private final String pid;
+
+  private ITargetURI currentTargetUrl = new NoOpTargetURI();
+
+  private static final String NA = "N/A";
+
+  private TransportStats stats = new TransportStats();
+
+  private IRegistryClient registryClient;
+
+  // holds reference to HttpPost object for every thread. Key=thread id
+  private Map<Long, HttpPost> httpPostMap = new HashMap<>();
+
+  private volatile boolean stopping = false;
+
+  private volatile boolean running = false;
+
+  private volatile boolean log = true;
+
+  private AtomicLong xstreamTime = new AtomicLong();
+  // private ThreadLocal<HashMap<Long, XStream>> localXStream = new 
ThreadLocal<HashMap<Long,
+  // XStream>>() {
+  // @Override
+  // protected HashMap<Long, XStream> initialValue() {
+  // return new HashMap<Long, XStream>();
+  // }
+  // };
+
+  public HttpServiceTransport(IRegistryClient registryClient, int scaleout)
+          throws ServiceException {
+    this.registryClient = registryClient;
+    clientMaxConnections = scaleout;
+
+    try {
+      nodeIP = InetAddress.getLocalHost().getHostAddress();
+      nodeName = InetAddress.getLocalHost().getCanonicalHostName();
+      pid = getProcessIP(NA);
+    } catch (UnknownHostException e) {
+      throw new RuntimeException(new TransportException(
+              "HttpServiceTransport.ctor - Unable to determine Host Name and 
IP", e));
+    }
+
+  }
+
+  private HttpPost getPostMethodForCurrentThread() {
+    HttpPost postMethod;
+    if (!httpPostMap.containsKey(Thread.currentThread().getId())) {
+      // each thread needs its own PostMethod
+      postMethod = new HttpPost(currentTargetUrl.asString());
+      httpPostMap.put(Thread.currentThread().getId(), postMethod);
+    } else {
+      postMethod = httpPostMap.get(Thread.currentThread().getId());
+    }
+    return postMethod;
+  }
+
+  private String getProcessIP(final String fallback) {
+    // the following code returns '<pid>@<hostname>'
+    String name = ManagementFactory.getRuntimeMXBean().getName();
+    int pos = name.indexOf('@');
+
+    if (pos < 1) {
+      // pid not found
+      return fallback;
+    }
+
+    try {
+      return Long.toString(Long.parseLong(name.substring(0, pos)));
+    } catch (NumberFormatException e) {
+      // ignore
+    }
+    return fallback;
+  }
+
+  private void lookupNewTarget() {
+    registryLookupLock.lock();
+    while (!stopping) {
+      try {
+        String newTarget = registryClient.lookUp(currentTargetUrl.asString());
+        currentTargetUrl = TargetURIFactory.newTarget(newTarget);
+        break;
+      } catch (Exception e) {
+        synchronized (httpClient) {
+
+          try {
+            httpClient.wait(threadSleepTime);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            break;
+          }
+        }
+      }
+    }
+    if (registryLookupLock.isHeldByCurrentThread()) {
+      registryLookupLock.unlock();
+    }
+  }
+
+  public void addRequestorInfo(IMetaTaskTransaction transaction) {
+    transaction.setRequesterAddress(nodeIP);
+    transaction.setRequesterNodeName(nodeName);
+    transaction.setRequesterProcessId(Integer.valueOf(pid));
+    transaction.setRequesterThreadId((int) Thread.currentThread().getId());
+    if (logger.isLoggable(Level.FINE)) {
+      logger.log(Level.FINE, "ip:" + transaction.getRequesterAddress());
+      logger.log(Level.FINE, "nodeName:" + transaction.getRequesterNodeName());
+      logger.log(Level.FINE, "processName:" + 
transaction.getRequesterProcessName());
+      logger.log(Level.FINE, "processId:" + 
transaction.getRequesterProcessId());
+      logger.log(Level.FINE, "threadId:" + transaction.getRequesterThreadId());
+
+    }
+
+  }
+
+  public void initialize() throws ServiceInitializationException {
+
+    // use plugged in registry to lookup target to connect to.
+    // Sets global: currentTarget
+    lookupNewTarget();
+
+    cMgr = new PoolingHttpClientConnectionManager();
+
+    if (clientMaxConnections > 0) {
+      cMgr.setMaxTotal(clientMaxConnections);
+    }
+    // Set default max connections per route
+    if (clientMaxConnectionsPerRoute > 0) {
+      cMgr.setDefaultMaxPerRoute(clientMaxConnectionsPerRoute);
+    }
+    HttpHost httpHost = new HttpHost(currentTargetUrl.asString(),
+            Integer.valueOf(currentTargetUrl.getPort()), 
currentTargetUrl.getContext());
+    if (clientMaxConnectionsPerHostPort > 0) {
+      cMgr.setMaxPerRoute(new HttpRoute(httpHost), 
clientMaxConnectionsPerHostPort);
+    }
+
+    httpClient = HttpClients.custom().setConnectionManager(cMgr).build();
+    running = true;
+
+  }
+
+  private void addCommonHeaders(HttpPost method) {
+    // synchronized( HttpServiceTransport.class ) {
+
+    method.setHeader("IP", nodeIP);
+    method.setHeader("Hostname", nodeName);
+    method.setHeader("ThreadID", 
String.valueOf(Thread.currentThread().getId()));
+    method.setHeader("PID", pid);
+
+    // }
+
+  }
+
+  private HttpEntity wrapRequest(String serializedRequest) {
+    return new StringEntity(serializedRequest, ContentType.APPLICATION_XML);
+  }
+
+  private boolean isRunning() {
+    return running;
+  }
+
+  private IMetaTaskTransaction retryUntilSuccessfull(String request, HttpPost 
postMethod,
+          ThreadLocal<HashMap<Long, XStream>> localXStream) {
+    IMetaTaskTransaction response = null;
+
+    // retry until service is stopped
+    while (isRunning()) {
+      try {
+        response = doPost(postMethod, localXStream);
+        break;
+
+      } catch (TransportException | IOException | URISyntaxException exx) {
+        try {
+          Thread.sleep(threadSleepTime);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      lookupNewTarget();
+
+    }
+    return response;
+
+  }
+
+  private IMetaTaskTransaction doPost(HttpPost postMethod,
+          ThreadLocal<HashMap<Long, XStream>> localXStream)
+          throws URISyntaxException, IOException, TransportException {
+    postMethod.setURI(new URI(currentTargetUrl.asString()));
+
+    IMetaTaskTransaction metaTransaction = null;
+    HttpResponse response = httpClient.execute(postMethod);
+    if (stopping) {
+      throw new TransportException("Service stopping - rejecting request");
+    }
+    HttpEntity entity = response.getEntity();
+    String serializedResponse = EntityUtils.toString(entity);
+    Object transaction = null;
+    try {
+      long t1 = System.currentTimeMillis();
+      // transaction = XStreamUtils.unmarshall(serializedResponse);
+      transaction = localXStream.get().get(Thread.currentThread().getId())
+              .fromXML(serializedResponse);
+      xstreamTime.addAndGet((System.currentTimeMillis() - t1));
+    } catch (Exception e) {
+      logger.log(Level.WARNING, "Process Thread:" + 
Thread.currentThread().getId()
+              + " Error while deserializing response with XStream", e);
+      throw new TransportException(e);
+    }
+    if (Objects.isNull(transaction)) {
+      throw new InvalidClassException("Expected IMetaTaskTransaction - Instead 
Received NULL");
+
+    } else if (!(transaction instanceof IMetaTaskTransaction)) {
+      throw new InvalidClassException("Expected IMetaTaskTransaction - Instead 
Received "
+              + transaction.getClass().getName());
+    }
+    metaTransaction = (IMetaTaskTransaction) transaction;
+
+    StatusLine statusLine = response.getStatusLine();
+    if (statusLine.getStatusCode() != 200) {
+      // all IOExceptions are retried
+      throw new IOException("Unexpected HttpClient response status:" + 
statusLine
+              + " Content causing error:" + serializedResponse);
+    }
+
+    stats.incrementSuccessCount();
+    return metaTransaction;
+  }
+
+  /**
+   * Dispatches request to remote driver via doPost(). Its synchronized to 
prevent over-running the
+   * driver with requests from multiple threads. When the transport fails 
sending GET/ACK/END a
+   * single thread will try to recover connection and send the request.
+   * 
+   */
+  @Override
+  public IMetaTaskTransaction dispatch(String serializedRequest,
+          ThreadLocal<HashMap<Long, XStream>> localXStream) throws 
TransportException {
+    if (stopping) {
+      throw new IllegalStateException(
+              "Service transport has been stopped, unable to dispatch 
request");
+    }
+    IMetaTaskTransaction transaction = null;
+    HttpEntity e = wrapRequest(serializedRequest);
+    // Each thread has its own HttpPost method. If current thread
+    // doesnt have one, it will be created and added to the local
+    // Map. Subsequent requests will fetch it from the map using
+    // current thread ID as a key.
+    HttpPost postMethod = getPostMethodForCurrentThread();
+    addCommonHeaders(postMethod);
+    postMethod.setEntity(e);
+    try {
+      String simulatedException;
+      // To test transport errors add command line option
+      // -DMockHttpPostError=exception where
+      // exception is one of the following Strings:
+      //
+      // IOException,
+      // SocketException,
+      // UnknownHostException,
+      // NoRouteToHostException,
+      // NoHttpResponseException,
+      // HttpHostConnectException,
+      // URISyntaxException
+      // Use JUnit test JunitTransoirtTestCase to test the above errors
+
+      if ((simulatedException = System.getProperty("MockHttpPostError")) != 
null) {
+        HttpClientExceptionGenerator mockExceptionGenerator = new 
HttpClientExceptionGenerator(
+                simulatedException);
+        mockExceptionGenerator.throwSimulatedException();
+      } else {
+        transaction = doPost(postMethod, localXStream);
+      }
+    } catch (IOException | URISyntaxException ex) {
+      if (stopping) {
+        // looks like the process is in the shutdown mode. Log an exception 
and dont
+        // retry
+        logger.log(Level.INFO, "Process Thread:" + 
Thread.currentThread().getId()
+                + " - Process is already stopping - Caught Exception while 
calling doPost() \n"
+                + ex);
+        throw new TransportException(ex);
+      } else {
+        if (log) {
+          log = false;
+          stats.incrementErrorCount();
+          logger.log(Level.WARNING,
+                  this.getClass().getName() + ".dispatch() >>>>>>>>>> Handling 
Exception \n" + ex);
+          logger.log(Level.INFO,
+                  ">>>>>>>>>> Unable to communicate with target:" + 
currentTargetUrl.asString()
+                          + " - retrying until successfull - with " + 
threadSleepTime / 1000
+                          + " seconds wait between retries  ");
+        }
+        transaction = retryUntilSuccessfull(serializedRequest, postMethod, 
localXStream);
+        log = true;
+        logger.log(Level.INFO, "Established connection to target:" + 
currentTargetUrl.asString());
+      }
+
+    } finally {
+      postMethod.releaseConnection();
+    }
+    return transaction;
+
+  }
+
+  public void stop(boolean quiesce) {
+
+    stopping = true;
+    running = false;
+    // Use System.out since the logger's ShutdownHook may have closed streams
+    System.out.println(Utils.getTimestamp() + ">>>>>>> " + 
Utils.getShortClassname(this.getClass())
+            + " stop() called - mode:" + (quiesce == true ? "quiesce" : 
"stop"));
+    logger.log(Level.INFO, this.getClass().getName() + " stop() called");
+    System.out.println(" ########################################3 Total time 
in XStream:"
+            + (xstreamTime.get() / 1000) + " secs");
+    if (!quiesce && cMgr != null) {
+      cMgr.shutdown();
+      System.out.println(Utils.getTimestamp() + ">>>>>>> "
+              + Utils.getShortClassname(this.getClass()) + " stopped 
connection mgr");
+      logger.log(Level.INFO, this.getClass().getName() + " stopped connection 
mgr");
+
+    }
+  }
+
+  public static void main(String[] args) {
+
+  }
+
+  public static class HttpClientExceptionGenerator {
+    public enum ERROR {
+      IOException, SocketException, UnknownHostException, 
NoRouteToHostException, NoHttpResponseException, HttpHostConnectException, 
URISyntaxException
+    };
+
+    Exception exceptionClass = null;
+
+    public HttpClientExceptionGenerator(String exc) {
+
+      for (ERROR e : ERROR.values()) {
+        if (exc != null && e.name().equals(exc)) {
+          switch (e) {
+            case IOException:
+              exceptionClass = new IOException("Simulated IOException");
+              break;
+            case URISyntaxException:
+              exceptionClass = new URISyntaxException("", "Simulated 
URISyntaxException");
+              break;
+            case NoRouteToHostException:
+              exceptionClass = new NoRouteToHostException("Simulated 
NoRouteToHostException");
+              break;
+            case NoHttpResponseException:
+              exceptionClass = new NoHttpResponseException("Simulated 
NoHttpResponseException");
+              break;
+            case SocketException:
+              exceptionClass = new SocketException("Simulated 
SocketException");
+              break;
+            case UnknownHostException:
+              exceptionClass = new UnknownHostException("Simulated 
UnknownHostException");
+              break;
+
+            default:
+
+          }
+          if (exceptionClass != null) {
+            break;
+          }
+        }
+      }
+    }
+
+    public void throwSimulatedException() throws IOException, 
URISyntaxException {
+      if (exceptionClass != null) {
+        if (exceptionClass instanceof IOException) {
+          throw (IOException) exceptionClass;
+        } else if (exceptionClass instanceof URISyntaxException) {
+          throw (URISyntaxException) exceptionClass;
+        }
+
+      }
+    }
+
+  }
 
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
 Wed May  1 13:42:39 2019
@@ -21,6 +21,12 @@ package org.apache.uima.ducc.ps;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -34,6 +40,7 @@ import org.apache.uima.cas.CAS;
 import org.apache.uima.ducc.ps.net.iface.IMetaTask;
 import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
 import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Direction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type;
 import org.apache.uima.ducc.ps.net.impl.MetaTask;
 import org.apache.uima.ducc.ps.service.transport.XStreamUtils;
 import org.apache.uima.ducc.ps.service.utils.UimaSerializer;
@@ -48,243 +55,441 @@ import org.eclipse.jetty.servlet.Servlet
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.junit.After;
 
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
 public class Client {
-       private Server server;
-       private boolean block = false;
-       private AtomicLong errorCount = new AtomicLong();
-       private final static String app="test";
-       private int httpPort = 12222;
-       private int maxThreads = 50;
-       private static UimaSerializer uimaSerializer = new UimaSerializer();
-       private AtomicInteger correlationIdCounter = 
-                       new AtomicInteger(0);
-       private AtomicInteger atomicCounter =
-                       new AtomicInteger(1);
-       private AtomicInteger atomicErrorCounter =
-                       new AtomicInteger(10);
-       private volatile boolean noMoreErrors = false;
-       protected String getApp() {
-               return app;
-       }
-       protected int getJettyPort() {
-               while(true) {
-                       ServerSocket socket=null;
-                       try {
-                               socket = new ServerSocket(httpPort);
-                               break;
-                       } catch( IOException e) {
-                               httpPort++;
-                       } finally {
-                               if ( socket != null ) {
-                                       try {
-                                               socket.close();
-                                       } catch( Exception ee) {}
-                                       
-                               }
-                       }
-               }
-               return httpPort;
-       }
-       protected int getPort() {
-
-               return httpPort;
-       }
-          
-       public void startJetty(boolean block) throws Exception {
-               this.block = block;
-               
-               
-                       QueuedThreadPool threadPool = new QueuedThreadPool();
-                       if (maxThreads < threadPool.getMinThreads()) {
-                               System.out.println(
-                               "Invalid value for jetty 
MaxThreads("+maxThreads+") - it should be greater or equal to 
"+threadPool.getMinThreads()+". Defaulting to 
jettyMaxThreads="+threadPool.getMaxThreads());
-                               
threadPool.setMaxThreads(threadPool.getMinThreads());
-                       } else {
-                               threadPool.setMaxThreads(maxThreads);
-                       }
-
-                   server = new Server(threadPool);
-
-                       // Server connector
-                       ServerConnector connector = new ServerConnector(server);
-                       connector.setPort(getJettyPort());
-                       server.setConnectors(new Connector[] { connector });
-               System.out.println("launching Jetty on 
Port:"+connector.getPort());
-                       ServletContextHandler context = new 
ServletContextHandler(
-                                       ServletContextHandler.SESSIONS);
-                       context.setContextPath("/");
-                       server.setHandler(context);
-
-                       context.addServlet(new ServletHolder(new 
TaskHandlerServlet()), "/"+app);
-               
-               
-                       server.start();
-               System.out.println("Jetty Started - Waiting for Messages ...");
-           }
-
-           @After
-           public void stopJetty()
-           {
-               try
-               {
-                       if ( server != null ) {
-                               UIMAFramework.getLogger().log(Level.INFO, 
"Stopping Jetty");
-                           server.stop();
-                               
-                       }
-               }
-               catch (Exception e)
-               {
-                   e.printStackTrace();
-               }
-               UIMAFramework.getLogger().log(Level.INFO,"Jetty Stopped");
-           }
-               public class TaskHandlerServlet extends HttpServlet {
-                       private static final long serialVersionUID = 1L;
-
-                       public TaskHandlerServlet() {
-                       }
-
-                       protected void doPost(HttpServletRequest request,
-                                       HttpServletResponse response) throws 
ServletException,
-                                       IOException {
-                               try {
-                                       //System.out.println("Handling HTTP 
Post Request");
-                                       //long post_stime = System.nanoTime();
-                                       StringBuilder sb = new StringBuilder();
-                                       BufferedReader reader = 
request.getReader();
-                                       String line;
-                                       while ((line = reader.readLine()) != 
null) {
-                                               sb.append(line);
-                                       }
-                                       String content = sb.toString().trim();
-
-                                       //System.out.println( "Http Request 
Body:::"+String.valueOf(content));
-                                       
-                                       
-                                String nodeIP = request.getHeader("IP");
-                            String nodeName = request.getHeader("Hostname");
-                            String threadID = request.getHeader("ThreadID");
-                            String pid = request.getHeader("PID");
-                                       System.out.println( "Sender ID:::Node 
IP"+nodeIP+" Node Name:"+nodeName+" PID:"+pid+" ThreadID:"+threadID);
-
-                                       IMetaTaskTransaction imt = null;
-
-                                       imt = (IMetaTaskTransaction) 
XStreamUtils.unmarshall(content);
-                                       IMetaTaskTransaction.Type type = 
imt.getType();
-                                       switch(type) {
-                                       case Get:
-                                               System.out.println("---- Driver 
handling GET Request -- Thread:"+Thread.currentThread().getId());
-                                               
imt.setMetaTask(getMetaMetaCas());
-                                               
imt.getMetaTask().setAppData("CorrelationID-"+correlationIdCounter.incrementAndGet());
-                                               if ( 
System.getProperty("simulate.no.work") == null || noMoreErrors) {
-                                                       
imt.getMetaTask().setUserSpaceTask(getSerializedCAS());
-                                               } else {
-                                                       
System.out.println("---- Driver handling GET Request -- Client Out of Tasks -- 
Thread:"+Thread.currentThread().getId());
-                                                       
imt.getMetaTask().setUserSpaceTask(null);
-                                                       if ( 
atomicErrorCounter.decrementAndGet() == 0 ) {
-                                                               noMoreErrors = 
true;
-                                                       }
-                                               }
-                                       //      
handleMetaCasTransationGet(trans, taskConsumer);
-                                               break;
-                                       case Ack:
-                                               System.out.println("---- Driver 
handling ACK Request - ");
-                                               
//handleMetaCasTransationAck(trans, taskConsumer);
-                                               break;
-                                       case End:
-                                               System.out.println("---- Driver 
handling END Request - "+imt.getMetaTask().getAppData());
-                                               
//handleMetaCasTransationEnd(trans, taskConsumer);
-                                               if ( 
imt.getMetaTask().getUserSpaceException() != null ) {
-                                                       
System.out.println("Client received error#"+errorCount.incrementAndGet());
-                                               }
-                                               break;
-                                       case InvestmentReset:
-                                       //      
handleMetaCasTransationInvestmentReset(trans, rwt);
-                                               break;
-                                       default:
-                                               break;
-                                       }
-                                       // process service request
-                                       //taskProtocolHandler.handle(imt);
-
-                                       //long marshall_stime = 
System.nanoTime();
-                                       // setup reply
-                                       
-                                       imt.setDirection(Direction.Response);
-
-                                       
response.setStatus(HttpServletResponse.SC_OK);
-
-                                       response.setHeader("content-type", 
"text/xml");
-                                       String body = 
XStreamUtils.marshall(imt);
-
-                                       if (block ) {
-                                               synchronized(this) {
-                                                       this.wait(0);
-                                               }
-                                               
-                                       }
-                                       System.out.println("Sending response");
-                                       response.getWriter().write(body);
-                                       
-                                       
-                                       //response.getWriter().write(content);
-                               } catch( InterruptedException e) {
-                                       Thread.currentThread().interrupt();
-                               }
-                               catch (Throwable e) {
-                                       e.printStackTrace();
-                                       throw new ServletException(e);
-                               }
-                       }
-
-               }
-               public long getErrorCount() {
-                       return errorCount.get();
-               }
-               private IMetaTask getMetaCas(String serializedCas) {
-                       if ( serializedCas == null ) {
-                               return null;
-                       }
-                       return new MetaTask(atomicCounter.incrementAndGet(), 
"", serializedCas);
-               }
-
-               private IMetaTask getMetaMetaCas() {
-                       //IMetaMetaCas mmc = new MetaMetaCas();
-                                       
-                       String serializedCas = "Bogus";
-
-                       IMetaTask metaCas = getMetaCas(serializedCas);
-                       
-               //      mmc.setMetaCas(metaCas);
-                       //return mmc;
-                       return metaCas;
-               }
-               public String getSerializedCAS() {
-                       //logger.log(Level.INFO,"getSerializedCAS() Call 
"+seqno.incrementAndGet()
-                       //        + " - from 
"+taskConsumer.getType()+":"+taskConsumer.getHostName()+"-"+taskConsumer.getPid()+"-"+taskConsumer.getThreadId()
 );
-                       String serializedCas = null;
-                       try {
-                               CAS cas = null;
-                               cas = CasCreationUtils.createCas(new 
TypeSystemDescription_impl(), null, null);
-                               cas.setDocumentLanguage("en");
-                               
-                               //logger.log(Level.INFO,"delivering: " + text);
-                               cas.setDocumentText("TEST");
-//                             cas.setDocumentText("100 
"+seqno.incrementAndGet()+" 1000 0");
-
-                               serializedCas = serialize(cas);
-                               cas.reset();
-                               cas.release();
-
-                       } catch( Exception e) {
-                               //logger.log(Level.WARNING,"Error",e);
-                       }
-
-                       return serializedCas;
-               }
-               private String serialize(CAS cas) throws Exception {
-                       String serializedCas = 
uimaSerializer.serializeCasToXmi(cas);
-                       return serializedCas;
-               }
+  private Server server;
+
+  private boolean block = false;
+
+  private AtomicLong errorCount = new AtomicLong();
+
+  private AtomicLong taskCount = new AtomicLong();
+
+  private final static String app = "test";
+
+  private int httpPort = 12222;
+
+  private int maxThreads = 50;
+
+  private volatile boolean print = true;
+
+  private static UimaSerializer uimaSerializer = new UimaSerializer();
+
+  private AtomicInteger correlationIdCounter = new AtomicInteger(0);
+
+  private AtomicInteger atomicCounter = new AtomicInteger(1);
+
+  private AtomicInteger atomicErrorCounter = new AtomicInteger(16);
+
+  private volatile boolean noMoreErrors = false;
+
+  Map<String, List<ThreadMetrics>> metrics = new ConcurrentHashMap<>();
+
+  private AtomicLong idleTime = new AtomicLong();
+
+  private AtomicLong lastTime = new AtomicLong();
+
+  private AtomicLong xstreamTime = new AtomicLong();
+
+  private ThreadLocal<HashMap<Long, XStream>> localXStream = new 
ThreadLocal<HashMap<Long, XStream>>() {
+    @Override
+    protected HashMap<Long, XStream> initialValue() {
+      return new HashMap<Long, XStream>();
+    }
+  };
+
+  protected String getApp() {
+    return app;
+  }
+
+  protected int getJettyPort() {
+    while (true) {
+      ServerSocket socket = null;
+      try {
+        socket = new ServerSocket(httpPort);
+        break;
+      } catch (IOException e) {
+        httpPort++;
+      } finally {
+        if (socket != null) {
+          try {
+            socket.close();
+          } catch (Exception ee) {
+          }
+
+        }
+      }
+    }
+    return httpPort;
+  }
+
+  protected int getPort() {
+
+    return httpPort;
+  }
+
+  public void startJetty(boolean block) throws Exception {
+    this.block = block;
+
+    QueuedThreadPool threadPool = new QueuedThreadPool();
+    if (maxThreads < threadPool.getMinThreads()) {
+      System.out.println("Invalid value for jetty MaxThreads(" + maxThreads
+              + ") - it should be greater or equal to " + 
threadPool.getMinThreads()
+              + ". Defaulting to jettyMaxThreads=" + 
threadPool.getMaxThreads());
+      threadPool.setMaxThreads(threadPool.getMinThreads());
+    } else {
+      threadPool.setMaxThreads(maxThreads);
+    }
+
+    server = new Server(threadPool);
+
+    // Server connector
+    ServerConnector connector = new ServerConnector(server);
+    System.out.println(">>>> Jetty Acceptors:" + connector.getAcceptors());
+
+    connector.setPort(getJettyPort());
+    server.setConnectors(new Connector[] { connector });
+    System.out.println("launching Jetty on Port:" + connector.getPort());
+    ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
+    context.setContextPath("/");
+    server.setHandler(context);
+
+    context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/" + app);
+
+    server.start();
+    System.out.println("Jetty Started - Waiting for Messages ...");
+  }
+
+  @After
+  public void stopJetty() {
+    try {
+      if (server != null) {
+        UIMAFramework.getLogger().log(Level.INFO, "Stopping Jetty");
+        server.stop();
+
+      }
+      // System.out.println(">>>>>>>>>>>>>>> IDLE 
TIME:"+(idleTime.longValue()/1000));
+
+      if (print) {
+        print = false;
+
+        for (Entry<String, List<ThreadMetrics>> me : metrics.entrySet()) {
+          StringBuilder sb = new StringBuilder();
+          sb.append("Service Thread Id:").append(me.getKey()).append(" Number 
of Tasks Processed:")
+                  .append(me.getValue().size());
+          int i = 0;
+          for (ThreadMetrics tm : me.getValue()) {
+            long analysisTime = 0;
+            try {
+              analysisTime = Long.parseLong(tm.getAnalysisTime());
+            } catch (Exception e) {
+            }
+            ThreadMetrics previous = null;
+            if (i > 0) {
+              previous = me.getValue().get(i - 1);
+            }
+            sb.append("\n\tTask ").append(tm.getCorrelationId()).// append(" 
Ack Time:").
+                    append(tm.getAckTime() - tm.getGetTime()).append(" 
Get-Ack-End:")
+                    .append(tm.getEndTime() - tm.getGetTime()).append(" ms")
+                    .append(" Analysis 
Time:").append(tm.getAnalysisTime()).append(" Overhead:")
+                    .append((tm.getEndTime() - tm.getGetTime()) - 
analysisTime);
+            if (previous != null) {
+              sb.append(" Idle Time:").append(tm.getGetTime() - 
previous.getEndTime());
+            }
+            i++;
+
+          }
+
+          System.out.println(sb.toString());
+          System.out.println(">>>>> Total Tasks Processed:" + taskCount + " 
Client Time in xstream:"
+                  + (xstreamTime.get() / 1000));
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    UIMAFramework.getLogger().log(Level.INFO, "Jetty Stopped");
+  }
+
+  public class TaskHandlerServlet extends HttpServlet {
+    private static final long serialVersionUID = 1L;
+
+    public TaskHandlerServlet() {
+    }
+
+    protected void doPost(HttpServletRequest request, HttpServletResponse 
response)
+            throws ServletException, IOException {
+      try {
+        // System.out.println("Handling HTTP Post Request");
+        // long post_stime = System.nanoTime();
+        StringBuilder sb = new StringBuilder();
+        BufferedReader reader = request.getReader();
+        String line;
+        while ((line = reader.readLine()) != null) {
+          sb.append(line);
+        }
+        String content = sb.toString().trim();
+
+        // System.out.println( "Http Request Body:::"+String.valueOf(content));
+
+        if (localXStream.get().get(Thread.currentThread().getId()) == null) {
+          localXStream.get().put(Thread.currentThread().getId(), 
XStreamUtils.getXStreamInstance());// new
+                                                                               
                     // XStream(new
+                                                                               
                     // DomDriver()));
+        }
+
+        String nodeIP = request.getHeader("IP");
+        String nodeName = request.getHeader("Hostname");
+        String threadID = request.getHeader("ThreadID");
+        String pid = request.getHeader("PID");
+        // System.out.println( "Sender ID:::Node IP"+nodeIP+" Node 
Name:"+nodeName+" PID:"+pid+"
+        // ThreadID:"+threadID);
+
+        IMetaTaskTransaction imt = null;
+        long t1 = System.currentTimeMillis();
+        // imt = (IMetaTaskTransaction) XStreamUtils.unmarshall(content);
+        imt = (IMetaTaskTransaction) 
localXStream.get().get(Thread.currentThread().getId())
+                .fromXML(content);
+        xstreamTime.addAndGet(System.currentTimeMillis() - t1);
+        IMetaTaskTransaction.Type type = imt.getType();
+        switch (type) {
+          case Get:
+            // idleTime.addAndGet(System.currentTimeMillis() - 
lastTime.longValue());
+            // lastTime.set(System.currentTimeMillis());
+
+            taskCount.incrementAndGet();
+            // System.out.println("---- Driver handling GET Request -- 
Thread:"+threadID);
+            List<ThreadMetrics> tmList = null;
+            if (metrics.containsKey(threadID)) {
+              tmList = metrics.get(threadID);
+            } else {
+              tmList = new ArrayList<>();
+              metrics.put(threadID, tmList);
+
+            }
+            ThreadMetrics tm = new ThreadMetrics();
+            tm.setGetTime(System.currentTimeMillis());
+            tmList.add(tm);
+
+            imt.setMetaTask(getMetaMetaCas());
+
+            imt.getMetaTask().setAppData("CorrelationID-" + 
correlationIdCounter.incrementAndGet());
+            tm.setCorrelationId(imt.getMetaTask().getAppData());
+            if (System.getProperty("simulate.no.work") == null || 
noMoreErrors) {
+              imt.getMetaTask().setUserSpaceTask(getSerializedCAS());
+            } else {
+              System.out
+                      .println("---- Driver handling GET Request -- Client Out 
of Tasks -- Thread:"
+                              + threadID);
+              imt.getMetaTask().setUserSpaceTask(null);
+              if (atomicErrorCounter.decrementAndGet() == 0) {
+                noMoreErrors = true;
+              }
+            }
+            // handleMetaCasTransationGet(trans, taskConsumer);
+            break;
+          case Ack:
+            // System.out.println("---- Driver handling ACK Request - ");
+            List<ThreadMetrics> tmList2 = metrics.get(threadID);
+            for (ThreadMetrics tm2 : tmList2) {
+              if 
(imt.getMetaTask().getAppData().equals(tm2.getCorrelationId())) {
+                tm2.setAckTime(System.currentTimeMillis());
+                break;
+              }
+            }
+
+            // handleMetaCasTransationAck(trans, taskConsumer);
+            break;
+          case End:
+            // System.out.println("---- Driver handling END Request -
+            // "+imt.getMetaTask().getAppData());
+            List<ThreadMetrics> tmList3 = metrics.get(threadID);
+            for (ThreadMetrics tm3 : tmList3) {
+              if 
(imt.getMetaTask().getAppData().equals(tm3.getCorrelationId())) {
+                tm3.setEndTime(System.currentTimeMillis());
+                if (imt.getMetaTask().getPerformanceMetrics() != null) {
+                  String metrics = imt.getMetaTask().getPerformanceMetrics();
+                  int start = metrics.indexOf("<analysisTime>") + 
"<analysisTime>".length();
+                  int end = metrics.indexOf("</analysisTime>");
+                  String analysisTime = metrics.substring(start, end);
+                  // System.out.println(">>>>>>>>>>>>>>>>> Analysis 
Time:"+analysisTime);
+                  tm3.setAnalysisTime(analysisTime);
+                }
+                break;
+              }
+            }
+            // handleMetaCasTransationEnd(trans, taskConsumer);
+            if (imt.getMetaTask().getUserSpaceException() != null) {
+              System.out.println("Client received error#" + 
errorCount.incrementAndGet());
+            }
+
+            break;
+          case InvestmentReset:
+            // handleMetaCasTransationInvestmentReset(trans, rwt);
+            break;
+          default:
+            break;
+        }
+        // process service request
+        // taskProtocolHandler.handle(imt);
+
+        // long marshall_stime = System.nanoTime();
+        // setup reply
+
+        imt.setDirection(Direction.Response);
+
+        response.setStatus(HttpServletResponse.SC_OK);
+
+        response.setHeader("content-type", "text/xml");
+        // String body = XStreamUtils.marshall(imt);
+        String body = 
localXStream.get().get(Thread.currentThread().getId()).toXML(imt);
+        if (block) {
+          synchronized (this) {
+            this.wait(0);
+          }
+
+        }
+
+        // System.out.println("Sending response");
+        response.getWriter().write(body);
+
+        // response.getWriter().write(content);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (Throwable e) {
+        e.printStackTrace();
+        throw new ServletException(e);
+      }
+    }
+
+  }
+
+  public long getErrorCount() {
+    return errorCount.get();
+  }
+
+  private IMetaTask getMetaCas(String serializedCas) {
+    if (serializedCas == null) {
+      return null;
+    }
+    return new MetaTask(atomicCounter.incrementAndGet(), "", serializedCas);
+  }
+
+  private IMetaTask getMetaMetaCas() {
+    // IMetaMetaCas mmc = new MetaMetaCas();
+
+    String serializedCas = "Bogus";
+
+    IMetaTask metaCas = getMetaCas(serializedCas);
+
+    // mmc.setMetaCas(metaCas);
+    // return mmc;
+    return metaCas;
+  }
+
+  public String getSerializedCAS() {
+    // logger.log(Level.INFO,"getSerializedCAS() Call "+seqno.incrementAndGet()
+    // + " - from
+    // 
"+taskConsumer.getType()+":"+taskConsumer.getHostName()+"-"+taskConsumer.getPid()+"-"+taskConsumer.getThreadId()
+    // );
+    String serializedCas = null;
+    try {
+      CAS cas = null;
+      cas = CasCreationUtils.createCas(new TypeSystemDescription_impl(), null, 
null);
+      cas.setDocumentLanguage("en");
+
+      // logger.log(Level.INFO,"delivering: " + text);
+      cas.setDocumentText("TEST");
+      // cas.setDocumentText("100 "+seqno.incrementAndGet()+" 1000 0");
+
+      serializedCas = serialize(cas);
+      cas.reset();
+      cas.release();
+
+    } catch (Exception e) {
+      // logger.log(Level.WARNING,"Error",e);
+    }
+
+    return serializedCas;
+  }
+
+  private String serialize(CAS cas) throws Exception {
+    String serializedCas = uimaSerializer.serializeCasToXmi(cas);
+    return serializedCas;
+  }
+
+  private class ThreadMetrics {
+    long getTime;
+
+    long ackTime;
+
+    long endTime;
+
+    long idleTime;
+
+    long lastTime;
+
+    String correlationId;
+
+    String analysisTime;
+
+    public long getLastTime() {
+      return lastTime;
+    }
+
+    public void setLastTime(long lastTime) {
+      this.lastTime = lastTime;
+    }
+
+    public String getAnalysisTime() {
+      return analysisTime;
+    }
+
+    public void setAnalysisTime(String analysisTime) {
+      this.analysisTime = analysisTime;
+    }
+
+    public long getIdleTime() {
+      return idleTime;
+    }
+
+    public void setIdleTime(long idleTime) {
+      this.idleTime = idleTime;
+    }
+
+    public String getCorrelationId() {
+      return correlationId;
+    }
+
+    public void setCorrelationId(String correlationId) {
+      this.correlationId = correlationId;
+    }
+
+    public long getGetTime() {
+      return getTime;
+    }
+
+    public void setGetTime(long getTime) {
+      this.getTime = getTime;
+    }
+
+    public long getAckTime() {
+      return ackTime;
+    }
+
+    public void setAckTime(long ackTime) {
+      this.ackTime = ackTime;
+    }
+
+    public long getEndTime() {
+      return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+      this.endTime = endTime;
+    }
+
+  }
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
 Wed May  1 13:42:39 2019
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.ps.service;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.uima.ducc.ps.Client;
 import org.apache.uima.ducc.ps.service.builders.PullServiceStepBuilder;
@@ -30,269 +31,264 @@ import org.apache.uima.ducc.ps.service.p
 import org.junit.Test;
 
 public class JunitPullServiceTestCase extends Client {
-       private static final long  DELAY=5000;
-       CountDownLatch threadsReady;
-       CountDownLatch stopLatch;
-       {
-               // static initializer sets amount of time the service delays
-               // sending READY to a monitor
-               System.setProperty("ducc.service.init.delay", "3000");
-       }
-       @Test
-       public void testPullService() throws Exception {
-               System.out.println("----------------- testPullService 
-------------------");
-               int scaleout = 2;
-               super.startJetty(false);  // don't block
-               String analysisEngineDescriptor = "TestAAE";
-               System.setProperty("ducc.deploy.JpType", "uima");
-
-               IServiceProcessor processor = new
-                               UimaServiceProcessor(analysisEngineDescriptor);
-
-               String tasURL = "http://localhost:"+super.getPort()+"/test";
-
-               IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
-                               .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
-                               .withOptionalsDone().build();
-
-               try {
-                       service.initialize();
-                       Timer fTimer = new Timer("testPullService Timer");
-                       // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer, 
false), DELAY);
-
-                       service.start();
-
-               } catch (ServiceInitializationException e) {
-                       throw e;
-               } catch (Exception e) {
-                       throw e;
-               } finally {
-                       super.stopJetty();
-               }
-       }
-       @Test
-       public void testPullServiceQuiesce() throws Exception {
-               System.out.println("----------------- testPullServiceQuiesce 
-------------------");
-               int scaleout = 2;
-               super.startJetty(false);  // don't block
-               String analysisEngineDescriptor = "TestAAE";
-               System.setProperty("ducc.deploy.JpType", "uima");
-               IServiceProcessor processor = new
-                               UimaServiceProcessor(analysisEngineDescriptor);
-
-               String tasURL = "http://localhost:"+super.getPort()+"/test";
-
-               IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
-                               .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
-                               .withOptionalsDone().build();
-
-               try {
-                       service.initialize();
-                       Timer fTimer = new Timer("testPullService Timer");
-                       // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer, true), 
DELAY);
-
-                       service.start();
-
-               } catch (ServiceInitializationException e) {
-                       throw e;
-               } catch (Exception e) {
-                       throw e;
-               }finally {
-                       super.stopJetty();
-               }
-       }
-
-       @Test
-       public void testPullServiceTimeout() throws Exception {
-               System.out.println("----------------- testPullServiceTimeout 
-------------------");
-               super.startJetty(true);  // true=client blocks all POST requests
-               int scaleout = 12;
-               String analysisEngineDescriptor = "TestAAE";
-               IServiceProcessor processor = new
-                               UimaServiceProcessor(analysisEngineDescriptor);
-
-               String tasURL ="http://localhost:"+super.getPort()+"/test";
-
-               IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
-                               .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
-                               .withOptionalsDone().build();
-
-               try {
-                       service.initialize();
-                       System.out.println("----------- Starting Service 
.....");
-                       Timer fTimer = new Timer();
-                       //after 10sec stop the service
-                       fTimer.schedule(new MyTimerTask(service, fTimer, 
false), DELAY);
-
-                       service.start();
-
-
-               } catch (ServiceInitializationException e) {
-                       throw e;
-               } catch (Exception e) {
-                       throw e;
-               }finally {
-                       super.stopJetty();
-               }
-       }
-
-       @Test
-       public void testStopOnFirstError() throws Exception {
-               System.out.println("----------------- testStopOnFirstError 
-------------------");
-               int scaleout = 10;
-               super.startJetty(false);  // don't block
-               String analysisEngineDescriptor = "NoOpAE";
-               System.setProperty("ducc.deploy.JpType", "uima");
-
-               IServiceProcessor processor =
-                               new 
UimaServiceProcessor(analysisEngineDescriptor);
-               // fail on 1st error
-               processor.setErrorHandlerWindow(1,  5);
-
-               String tasURL = "http://localhost:"+super.getPort()+"/test";
-
-               IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
-                               .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
-                               .withOptionalsDone().build();
-
-               try {
-                       System.setProperty("ProcessFail","2");
-                       service.initialize();
-
-                       service.start();
-
-               } catch (ServiceInitializationException e) {
-                       throw e;
-               } catch (Exception e) {
-                       throw e;
-               } finally {
-                       System.getProperties().remove("ProcessFail");
-                       super.stopJetty();
-               }
-       }
-       @Test
-       public void testTerminateOn2ErrorsInWindowOf5() throws Exception {
-               System.out.println("----------------- 
testTerminateOn2ErrorsInWindowOf5 -------------------");
-               int scaleout = 10;
-               super.startJetty(false);  // don't block
-               String analysisEngineDescriptor = "NoOpAE";
-               System.setProperty("ducc.deploy.JpType", "uima");
-
-               IServiceProcessor processor =
-                               new 
UimaServiceProcessor(analysisEngineDescriptor);
-               // fail on 2nd error in a window of 5
-               processor.setErrorHandlerWindow(2,  5);
-               String tasURL = "http://localhost:"+super.getPort()+"/test";
-
-               IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
-                               .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
-                               .withOptionalsDone().build();
-
-               try {
-                       // fail task#1 and task#3 which should stop the test
-                       System.setProperty("ProcessFail","1,3");
-                       service.initialize();
-
-                       service.start();
-
-               } catch (ServiceInitializationException e) {
-                       throw e;
-               } catch (Exception e) {
-                       throw e;
-               } finally {
-                       System.getProperties().remove("ProcessFail");
-                       super.stopJetty();
-               }
-       }
-       @Test
-       public void testProcessFailureDefaultErrorHandler() throws Exception {
-               System.out.println("----------------- 
testProcessFailureDefaultErrorHandler -------------------");
-               int scaleout = 14;
-               super.startJetty(false);  // don't block
-               String analysisEngineDescriptor = "NoOpAE";
-               IServiceProcessor processor = new
-                               UimaServiceProcessor(analysisEngineDescriptor);
-
-               String tasURL = "http://localhost:"+super.getPort()+"/test";
-
-               IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
-                               .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
-                               .withOptionalsDone().build();
-
-               try {
-                       // fail on 2nd task. This should terminate the test
-                        System.setProperty("ProcessFail","20");
-                       service.initialize();
-                       Timer fTimer = new Timer("testPullService Timer");
-                       // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer, 
false), 20000);
-
-                       service.start();
-
-               } catch (ServiceInitializationException e) {
-                       throw e;
-               } catch (Exception e) {
-                       throw e;
-               } finally {
-                       System.getProperties().remove("ProcessFail");
-                       super.stopJetty();
-               }
-       }
-
-       /*
-       @Test
-       public void testPullServiceBadClientURL() throws Exception {
-               int scaleout = 2;
-               super.startJetty(false);  // don't block
-               String analysisEngineDescriptor = "TestAAE";
-               IServiceProcessor processor = new
-                               UimaServiceProcessor(analysisEngineDescriptor);
-
-               String tasURL ="http://localhost2:8080/test";;
-
-               IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
-                               .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
-                               .withOptionalsDone().build();
-
-               try {
-                       service.initialize();
-                       service.start();
-
-
-               } catch (ServiceInitializationException e) {
-                       throw e;
-               } catch (Exception e) {
-                       throw e;
-               }
-       }
-       */
-       class MyTimerTask extends TimerTask {
-               final IService service;
-               final Timer fTimer;
-               final boolean quiesce;
-
-               MyTimerTask(IService service, Timer fTimer, boolean quiesce) {
-                       this.service = service;
-                       this.fTimer = fTimer;
-                       this.quiesce = quiesce;
-               }
-
-               @Override
-
-               public void run() {
-                       this.cancel();
-                       fTimer.purge();
-                       fTimer.cancel();
-                       System.out.println("Timmer popped - stopping service");
-                       if (quiesce ) {
-                               service.quiesceAndStop();
-                       } else {
-                               service.stop();
-                       }
-               }
+  private static final long DELAY = 20000;
 
-       }
+  CountDownLatch threadsReady;
+
+  CountDownLatch stopLatch;
+  {
+    // static initializer sets amount of time the service delays
+    // sending READY to a monitor
+    System.setProperty("ducc.service.init.delay", "3000");
+  }
+
+  @Test
+  public void testPullService() throws Exception {
+    System.out.println("----------------- testPullService 
-------------------");
+    int scaleout = 20;
+    super.startJetty(false); // don't block
+    // String analysisEngineDescriptor = "TestAAE";
+    String analysisEngineDescriptor = "NoOpAE";
+    System.setProperty("ducc.deploy.JpType", "uima");
+    // System.setProperty("simulate.no.work", "3"); UNCOMMENT TO SIMULATE NO 
WORK
+    IServiceProcessor processor = new 
UimaServiceProcessor(analysisEngineDescriptor);
+
+    String tasURL = "http://localhost:"; + super.getPort() + "/test";
+
+    IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+    long start = System.currentTimeMillis();
+    try {
+      service.initialize();
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY);
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      long end = System.currentTimeMillis();
+      super.stopJetty();
+
+      System.out.println("###################### start-end clock time:" + (end 
- start) / 1000);
+    }
+  }
+
+  @Test
+  public void testPullServiceQuiesce() throws Exception {
+    System.out.println("----------------- testPullServiceQuiesce 
-------------------");
+    int scaleout = 12;
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "TestAAE";
+    System.setProperty("ducc.deploy.JpType", "uima");
+    IServiceProcessor processor = new 
UimaServiceProcessor(analysisEngineDescriptor);
+
+    String tasURL = "http://localhost:"; + super.getPort() + "/test";
+
+    IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+
+    try {
+      service.initialize();
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer, true), DELAY);
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      super.stopJetty();
+    }
+  }
+
+  @Test
+  public void testPullServiceTimeout() throws Exception {
+    System.out.println("----------------- testPullServiceTimeout 
-------------------");
+    super.startJetty(true); // true=client blocks all POST requests
+    int scaleout = 12;
+    String analysisEngineDescriptor = "TestAAE";
+    IServiceProcessor processor = new 
UimaServiceProcessor(analysisEngineDescriptor);
+
+    String tasURL = "http://localhost:"; + super.getPort() + "/test";
+
+    IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+
+    try {
+      service.initialize();
+      System.out.println("----------- Starting Service .....");
+      Timer fTimer = new Timer();
+      // after 10sec stop the service
+      fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY);
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      super.stopJetty();
+    }
+  }
+
+  @Test
+  public void testStopOnFirstError() throws Exception {
+    System.out.println("----------------- testStopOnFirstError 
-------------------");
+    int scaleout = 10;
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "NoOpAE";
+    System.setProperty("ducc.deploy.JpType", "uima");
+
+    IServiceProcessor processor = new 
UimaServiceProcessor(analysisEngineDescriptor);
+    // fail on 1st error
+    processor.setErrorHandlerWindow(1, 5);
+
+    String tasURL = "http://localhost:"; + super.getPort() + "/test";
+
+    IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+
+    try {
+      System.setProperty("ProcessFail", "2");
+      service.initialize();
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      System.getProperties().remove("ProcessFail");
+      super.stopJetty();
+    }
+  }
+
+  @Test
+  public void testTerminateOn2ErrorsInWindowOf5() throws Exception {
+    System.out.println("----------------- testTerminateOn2ErrorsInWindowOf5 
-------------------");
+    int scaleout = 10;
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "NoOpAE";
+    System.setProperty("ducc.deploy.JpType", "uima");
+
+    IServiceProcessor processor = new 
UimaServiceProcessor(analysisEngineDescriptor);
+    // fail on 2nd error in a window of 5
+    processor.setErrorHandlerWindow(2, 5);
+    String tasURL = "http://localhost:"; + super.getPort() + "/test";
+
+    IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+
+    try {
+      // fail task#1 and task#3 which should stop the test
+      System.setProperty("ProcessFail", "1,3");
+      service.initialize();
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      System.getProperties().remove("ProcessFail");
+      super.stopJetty();
+    }
+  }
+
+  @Test
+  public void testProcessFailureDefaultErrorHandler() throws Exception {
+    System.out
+            .println("----------------- testProcessFailureDefaultErrorHandler 
-------------------");
+    int scaleout = 14;
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "NoOpAE";
+    IServiceProcessor processor = new 
UimaServiceProcessor(analysisEngineDescriptor);
+
+    String tasURL = "http://localhost:"; + super.getPort() + "/test";
+
+    IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+            .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
+            .withOptionalsDone().build();
+
+    try {
+      // fail on 2nd task. This should terminate the test
+      System.setProperty("ProcessFail", "20");
+      service.initialize();
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer, false), 20000);
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      System.getProperties().remove("ProcessFail");
+      super.stopJetty();
+    }
+  }
+
+  /*
+   * @Test public void testPullServiceBadClientURL() throws Exception { int 
scaleout = 2;
+   * super.startJetty(false); // don't block String analysisEngineDescriptor = 
"TestAAE";
+   * IServiceProcessor processor = new 
UimaServiceProcessor(analysisEngineDescriptor);
+   * 
+   * String tasURL ="http://localhost2:8080/test";;
+   * 
+   * IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+   * .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout)
+   * .withOptionalsDone().build();
+   * 
+   * try { service.initialize(); service.start();
+   * 
+   * 
+   * } catch (ServiceInitializationException e) { throw e; } catch (Exception 
e) { throw e; } }
+   */
+  class MyTimerTask extends TimerTask {
+    final IService service;
+
+    final Timer fTimer;
+
+    final boolean quiesce;
+
+    MyTimerTask(IService service, Timer fTimer, boolean quiesce) {
+      this.service = service;
+      this.fTimer = fTimer;
+      this.quiesce = quiesce;
+    }
+
+    @Override
+
+    public void run() {
+      this.cancel();
+      fTimer.purge();
+      fTimer.cancel();
+      System.out.println("Timmer popped - stopping service");
+      if (quiesce) {
+        service.quiesceAndStop();
+      } else {
+        service.stop();
+      }
+    }
+
+  }
 
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
 Wed May  1 13:42:39 2019
@@ -28,189 +28,194 @@ import org.apache.uima.ducc.ps.service.e
 import org.apache.uima.ducc.ps.service.main.ServiceWrapper;
 import org.junit.Test;
 
-public class JUnitServiceWrapperTestCase extends Client  {
-       private static final long  DELAY=5000;
-       {
-               // static initializer sets amount of time the service delays
-               // sending READY to a monitor
-               System.setProperty("ducc.service.init.delay", "3000");
-       }
-       
-       
-       @Test
-       public void testPullServiceWrapperNoTask() throws Exception {
-               // make client return null task in response to GET
-               System.setProperty("simulate.no.work", "3");
-               System.setProperty("ducc.process.thread.sleep.time", "1000");
-               try {
-                       testPullServiceWrapper();
-               } finally {
-                       System.getProperties().remove("simulate.no.work");
-               }
-       }
-       @Test
-       public void testPullServiceWrapper() throws Exception {
-               System.out.println("-------------------------- 
testPullServiceWrapper ----------------------");;
-
-               //int scaleout = 2;
-               StateMonitor monitor = new StateMonitor();
-               monitor.start();
-               System.out.println("........... Monitor 
Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT"));
-               super.startJetty(false);  // don't block
-               String analysisEngineDescriptor = "TestAAE";
-               System.setProperty("ducc.deploy.JpType", "uima");
-
-               String tasURL = "http://localhost:"+super.getPort()+"/test";
-               try {
-                       System.setProperty("ducc.deploy.JdURL", tasURL);
-                       System.setProperty("ducc.deploy.JpThreadCount","4");
-                       System.setProperty("ducc.deploy.service.type", 
"NotesService");
-                       System.setProperty("ducc.deploy.JpType", "uima");
-
-                       ServiceWrapper service = new ServiceWrapper();
-
-                       Timer fTimer = new Timer("testPullService Timer");
-                       // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer), 
40000);
-                               
-                       service.initialize(new String[] 
{analysisEngineDescriptor});
-
-                       service.start();
-
-
-               } catch (ServiceInitializationException e) {
-                       throw e;
-               } catch (Exception e) {
-                       throw e;
-               } finally {
-                       monitor.stop();
-                       super.stopJetty();
-
-               }
-       }
-       
-       @Test
-       public void testPullServiceWrapperWithProcessFailure() throws Exception 
{
-               System.out.println("-------------------------- 
testPullServiceWrapperWithProcessFailure ----------------------");;
-               //int scaleout = 2;
-               StateMonitor monitor = new StateMonitor();
-               monitor.start();
-               System.out.println("........... Monitor 
Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT"));
-               super.startJetty(false);  // don't block
-               String analysisEngineDescriptor = "NoOpAE";
-
-               String tasURL = "http://localhost:"+super.getPort()+"/test";
-               try {
-                       // Force process failure of the first task
-                       System.setProperty("ProcessFail","1");
-                        
-                       System.setProperty("ducc.deploy.JdURL", tasURL);
-                       System.setProperty("ducc.deploy.JpThreadCount","4");
-                       System.setProperty("ducc.deploy.service.type", 
"NotesService");
-                       System.setProperty("ducc.deploy.JpType", "uima");
-                       // use default error window (1,1)
-                       ServiceWrapper service = new ServiceWrapper();
-
-                       Timer fTimer = new Timer("testPullService Timer");
-                       // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer), 
10000);
-                               
-                       service.initialize(new String[] 
{analysisEngineDescriptor});
-
-                       service.start();
-
-
-               } catch (ServiceInitializationException e) {
-                       throw e;
-               } catch (Exception e) {
-                       throw e;
-               } finally {
-                       monitor.stop();
-                       System.getProperties().remove("ProcessFail");
-                       super.stopJetty();
-               }
-       }
-       @Test
-       public void testPullServiceWrapperDDGenerator() throws Exception {
-               System.out.println("-------------------------- 
testPullServiceWrapperDDGenerator ----------------------");;
-
-               //int scaleout = 2;
-               StateMonitor monitor = new StateMonitor();
-               monitor.start();
-               System.out.println("........... Monitor 
Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT"));
-               super.startJetty(false);  // don't block
-               // Dont change the name of TestAAE.xml. This is setup to fail 
file lookup and force 
-               // generation of AE descriptor.
-               String analysisEngineDescriptor = "TestAAE.xml";
-               System.setProperty("ducc.deploy.JpType", "uima");
-
-               String tasURL = "http://localhost:"+super.getPort()+"/test";
-               try {
-                       
-                       System.setProperty("ducc.deploy.JdURL", tasURL);
-                       System.setProperty("ducc.deploy.JpThreadCount","4");
-                       System.setProperty("ducc.deploy.service.type", 
"NotesService");
-                       System.setProperty("ducc.deploy.JpType", "uima");
-                       
System.setProperty("ducc.deploy.JpAeDescriptor","NoOpAE");
-                       
System.setProperty("ducc.deploy.JobDirectory",System.getProperty("user.dir"));
-                       
System.setProperty("ducc.deploy.JpFlowController","org.apache.uima.flow.FixedFlowController");
-                       
System.setProperty("ducc.process.log.dir",System.getProperty("user.dir"));
-                       System.setProperty("ducc.job.id","2000");
-                       ServiceWrapper service = new ServiceWrapper();
-
-                       Timer fTimer = new Timer("testPullService Timer");
-                       // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer), 
20000);
-                               
-                       service.initialize(new String[] 
{analysisEngineDescriptor});
-
-                       service.start();
-                       
-
-               } catch (ServiceInitializationException e) {
-                       throw e;
-               } catch (Exception e) {
-                       throw e;
-               } finally {
-                       monitor.stop();
-                       super.stopJetty();
-                       File directory = new 
File(System.getProperty("user.dir").
-                                        
concat("/").concat(System.getProperty("ducc.job.id")));
-                       
-                       if ( directory.exists() ) {
-                               for (File f : directory.listFiles()) {
-                                   if (f.getName().startsWith("uima-ae-")) {
-                                       f.delete();
-                                       System.out.println("Removed generated 
descriptor:"+f.getAbsolutePath());
-                                   }
-                               }
-                               directory.delete();
-                               
-                       }
-
-
-               }
-       }
-       class MyTimerTask extends TimerTask {
-               final ServiceWrapper service;
-               final Timer fTimer;
-
-               MyTimerTask(ServiceWrapper service, Timer fTimer) {
-                       this.service = service;
-                       this.fTimer = fTimer;
-               }
-
-               @Override
-
-               public void run() {
-                       this.cancel();
-                       fTimer.purge();
-                       fTimer.cancel();
-                       System.out.println("Timmer popped - stopping service");
-                       service.stop();
+public class JUnitServiceWrapperTestCase extends Client {
+  private static final long DELAY = 5000;
+  {
+    // static initializer sets amount of time the service delays
+    // sending READY to a monitor
+    System.setProperty("ducc.service.init.delay", "3000");
+  }
+
+  @Test
+  public void testPullServiceWrapperNoTask() throws Exception {
+    // make client return null task in response to GET
+    System.setProperty("simulate.no.work", "3");
+    System.setProperty("ducc.process.thread.sleep.time", "1000");
+    try {
+      testPullServiceWrapper();
+    } finally {
+      System.getProperties().remove("simulate.no.work");
+    }
+  }
+
+  @Test
+  public void testPullServiceWrapper() throws Exception {
+    System.out.println("-------------------------- testPullServiceWrapper 
----------------------");
+    ;
+
+    // int scaleout = 2;
+    StateMonitor monitor = new StateMonitor();
+    monitor.start();
+    System.out.println("........... Monitor Port:" + 
System.getProperty("DUCC_STATE_UPDATE_PORT"));
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "TestAAE";
+    System.setProperty("ducc.deploy.JpType", "uima");
+
+    String tasURL = "http://localhost:"; + super.getPort() + "/test";
+    try {
+      System.setProperty("ducc.deploy.JdURL", tasURL);
+      System.setProperty("ducc.deploy.JpThreadCount", "12");
+      System.setProperty("ducc.deploy.service.type", "NotesService");
+      System.setProperty("ducc.deploy.JpType", "uima");
+
+      ServiceWrapper service = new ServiceWrapper();
+
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer), 40000);
+
+      service.initialize(new String[] { analysisEngineDescriptor });
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      monitor.stop();
+      super.stopJetty();
+
+    }
+  }
+
+  @Test
+  public void testPullServiceWrapperWithProcessFailure() throws Exception {
+    System.out.println(
+            "-------------------------- 
testPullServiceWrapperWithProcessFailure ----------------------");
+    ;
+    // int scaleout = 2;
+    StateMonitor monitor = new StateMonitor();
+    monitor.start();
+    System.out.println("........... Monitor Port:" + 
System.getProperty("DUCC_STATE_UPDATE_PORT"));
+    super.startJetty(false); // don't block
+    String analysisEngineDescriptor = "NoOpAE";
+
+    String tasURL = "http://localhost:"; + super.getPort() + "/test";
+    try {
+      // Force process failure of the first task
+      System.setProperty("ProcessFail", "1");
+
+      System.setProperty("ducc.deploy.JdURL", tasURL);
+      System.setProperty("ducc.deploy.JpThreadCount", "4");
+      System.setProperty("ducc.deploy.service.type", "NotesService");
+      System.setProperty("ducc.deploy.JpType", "uima");
+      // use default error window (1,1)
+      ServiceWrapper service = new ServiceWrapper();
+
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer), 10000);
+
+      service.initialize(new String[] { analysisEngineDescriptor });
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      monitor.stop();
+      System.getProperties().remove("ProcessFail");
+      super.stopJetty();
+    }
+  }
+
+  @Test
+  public void testPullServiceWrapperDDGenerator() throws Exception {
+    System.out.println(
+            "-------------------------- testPullServiceWrapperDDGenerator 
----------------------");
+    ;
+
+    // int scaleout = 2;
+    StateMonitor monitor = new StateMonitor();
+    monitor.start();
+    System.out.println("........... Monitor Port:" + 
System.getProperty("DUCC_STATE_UPDATE_PORT"));
+    super.startJetty(false); // don't block
+    // Dont change the name of TestAAE.xml. This is setup to fail file lookup 
and force
+    // generation of AE descriptor.
+    String analysisEngineDescriptor = "TestAAE.xml";
+    System.setProperty("ducc.deploy.JpType", "uima");
+
+    String tasURL = "http://localhost:"; + super.getPort() + "/test";
+    try {
+
+      System.setProperty("ducc.deploy.JdURL", tasURL);
+      System.setProperty("ducc.deploy.JpThreadCount", "4");
+      System.setProperty("ducc.deploy.service.type", "NotesService");
+      System.setProperty("ducc.deploy.JpType", "uima");
+      System.setProperty("ducc.deploy.JpAeDescriptor", "NoOpAE");
+      System.setProperty("ducc.deploy.JobDirectory", 
System.getProperty("user.dir"));
+      System.setProperty("ducc.deploy.JpFlowController",
+              "org.apache.uima.flow.FixedFlowController");
+      System.setProperty("ducc.process.log.dir", 
System.getProperty("user.dir"));
+      System.setProperty("ducc.job.id", "2000");
+      ServiceWrapper service = new ServiceWrapper();
+
+      Timer fTimer = new Timer("testPullService Timer");
+      // after 5secs stop the pull service
+      fTimer.schedule(new MyTimerTask(service, fTimer), 20000);
+
+      service.initialize(new String[] { analysisEngineDescriptor });
+
+      service.start();
+
+    } catch (ServiceInitializationException e) {
+      throw e;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      monitor.stop();
+      super.stopJetty();
+      File directory = new File(
+              
System.getProperty("user.dir").concat("/").concat(System.getProperty("ducc.job.id")));
+
+      if (directory.exists()) {
+        for (File f : directory.listFiles()) {
+          if (f.getName().startsWith("uima-ae-")) {
+            f.delete();
+            System.out.println("Removed generated descriptor:" + 
f.getAbsolutePath());
+          }
+        }
+        directory.delete();
+
+      }
+
+    }
+  }
+
+  class MyTimerTask extends TimerTask {
+    final ServiceWrapper service;
+
+    final Timer fTimer;
+
+    MyTimerTask(ServiceWrapper service, Timer fTimer) {
+      this.service = service;
+      this.fTimer = fTimer;
+    }
+
+    @Override
+
+    public void run() {
+      this.cancel();
+      fTimer.purge();
+      fTimer.cancel();
+      System.out.println("Timmer popped - stopping service");
+      service.stop();
 
-               }
+    }
 
-       }
+  }
 
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
 Wed May  1 13:42:39 2019
@@ -24,6 +24,7 @@ import java.lang.management.ManagementFa
 import java.lang.management.RuntimeMXBean;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.uima.UIMAFramework;
@@ -126,7 +127,24 @@ public class NoOpAE extends CasAnnotator
                                }
                }
                }
+               try {
+                       //int n = getRandomNumberInRange(2000,3000);
+                       int n = getRandomNumberInRange(250,450);
+                       //System.out.println(" AE Sleeping for "+n + " millis");
+                Thread.sleep(n);
+                       
+               } catch( InterruptedException e) {
+                       
+               }
     }
+    private static int getRandomNumberInRange(int min, int max) {
 
+               if (min >= max) {
+                       throw new IllegalArgumentException("max must be greater 
than min");
+               }
+
+               Random r = new Random();
+               return r.nextInt((max - min) + 1) + min;
+       }
 
 }


Reply via email to