Author: cwiklik
Date: Wed May  1 13:42:39 2019
New Revision: 1858488

URL: http://svn.apache.org/viewvc?rev=1858488&view=rev
Log:
UIMA-6026 removed synchronization from client and pullservice. Also created 
XStream instance per thread and pin it via ThreadLocal

Modified:
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/DuccServiceTaskProtocolHandler.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/transport/HttpTaskTransportHandler.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/DuccServiceTaskProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/DuccServiceTaskProtocolHandler.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/DuccServiceTaskProtocolHandler.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/DuccServiceTaskProtocolHandler.java
 Wed May  1 13:42:39 2019
@@ -1,20 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+/*                                                                             
                                                                                
                                      
+ * Licensed to the Apache Software Foundation (ASF) under one                  
                                                                                
                                      
+ * or more contributor license agreements.  See the NOTICE file                
                                                                                
                                      
+ * distributed with this work for additional information                       
                                                                                
                                      
+ * regarding copyright ownership.  The ASF licenses this file                  
                                                                                
                                      
+ * to you under the Apache License, Version 2.0 (the                           
                                                                                
                                      
+ * "License"); you may not use this file except in compliance                  
                                                                                
                                      
+ * with the License.  You may obtain a copy of the License at                  
                                                                                
                                      
+ *                                                                             
                                                                                
                                      
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
                                                                                
                                      
+ *                                                                             
                                                                                
                                      
+ * Unless required by applicable law or agreed to in writing,                  
                                                                                
                                      
+ * software distributed under the License is distributed on an                 
                                                                                
                                      
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY                      
                                                                                
                                      
+ * KIND, either express or implied.  See the License for the                   
                                                                                
                                      
+ * specific language governing permissions and limitations                     
                                                                                
                                      
+ * under the License.                                                          
                                                                                
                                      
 */
 
 package org.apache.uima.ducc.ps.sd.task;
@@ -42,138 +42,145 @@ import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
 
 public class DuccServiceTaskProtocolHandler implements TaskProtocolHandler {
-       Logger logger = 
UIMAFramework.getLogger(DuccServiceTaskProtocolHandler.class);
-       private volatile boolean running = true;;
-       private static AtomicInteger atomicCounter =
-                       new AtomicInteger(0);
-
-       public DuccServiceTaskProtocolHandler(TaskAllocatorCallbackListener 
taskAllocator) {
-       }
-
-       @Override
-       public String initialize(Properties props) throws TaskProtocolException 
{
-               return null;
-       }
-
-       @Override
-       public void handle(IMetaTaskTransaction wi) throws 
TaskProtocolException {
-               handleMetaTaskTransation(wi);
-
-       }
-       private void handleMetaTaskTransation(IMetaTaskTransaction trans) {
-               try {
-                       trans.setResponseHints(new ArrayList<Hint>());
-
-                       TaskConsumer taskConsumer =
-                                       new WiTaskConsumer(trans);
-
-                       MessageBuffer mb = new MessageBuffer();
-                       
mb.append(Standardize.Label.remote.get()+taskConsumer.toString());
-                       mb.append(Standardize.Label.type.get()+trans.getType());
-                       Type type = trans.getType();
-                       switch(type) {
-                       case Get:
-                           if ( logger.isLoggable(Level.FINE)) {
-                                       logger.log(Level.FINE,"---- Driver 
handling GET Request - Requestor:"+taskConsumer.toString());
-                           }
-                               handleMetaTaskTransationGet(trans, 
taskConsumer);
-                               break;
-                       case Ack:
-                           if ( logger.isLoggable(Level.FINE)) {
-                                       logger.log(Level.FINE,"---- Driver 
handling ACK Request - Requestor:"+taskConsumer.toString());
-                           }
-                               handleMetaTaskTransationAck(trans, 
taskConsumer);
-                               break;
-                       case End:
-                           if ( logger.isLoggable(Level.FINE)) {
-                                       logger.log(Level.FINE,"---- Driver 
handling END Request - Requestor:"+taskConsumer.toString());
-                           }
-                               handleMetaTaskTransationEnd(trans, 
taskConsumer);
-                               break;
-                       case InvestmentReset:
-                       //      handleMetaCasTransationInvestmentReset(trans, 
rwt);
-                               break;
-                       default:
-                               break;
-                       }
-                       IMetaTask metaCas = trans.getMetaTask();
-                       if(metaCas != null) {
-                               metaCas.setPerformanceMetrics(null);
-                               metaCas.setUserSpaceException(null);
-                       }
-               }
-               catch(Exception e) {
-                       logger.log(Level.WARNING,"Error",e);
-               }
-               finally {
-               }
-       }
-
-       @Override
-       public String start() throws Exception {
-               running = true;
-               return null;
-       }
-
-       @Override
-       public void stop() throws Exception {
-               running = false;
-       }
-       private void handleMetaTaskTransationGet(IMetaTaskTransaction trans, 
TaskConsumer taskConsumer) {
-               IMetaMetaTask mmc = getMetaMetaTask(taskConsumer);
-               trans.setMetaTask( mmc.getMetaCas());
-       }
-       private IMetaTask getMetaTask(String serializedCas) {
-               if ( serializedCas == null ) {
-                       return null;
-               }
-               return new MetaTask(atomicCounter.incrementAndGet(), "", 
serializedCas);
-       }
-
-       private synchronized IMetaMetaTask getMetaMetaTask(TaskConsumer 
taskConsumer) {
-               IMetaMetaTask mmc = new MetaMetaTask();
-               ServiceDriver sd = DuccServiceDriver.getInstance();
-               TaskAllocatorCallbackListener taskAllocator =
-                               sd.getTaskAllocator();
-               ITask task;
-               
-               task = taskAllocator.getTask(taskConsumer);
-               if ( task != null && !task.isEmpty() ) {
-                       IMetaTask metaTask = getMetaTask(task.asString());
-                       mmc.setMetaCas(metaTask);
-                       if ( logger.isLoggable(Level.FINE)) {
-                          logger.log(Level.FINE,"Returning TASK with 
appdata:"+task.getMetadata()+" to the service");
-                       }
-                       mmc.getMetaCas().setAppData(task.getMetadata());
-               }
-               return mmc;
-       }
-       private void handleMetaTaskTransationAck(IMetaTaskTransaction trans, 
TaskConsumer taskConsumer) {
-
-       }
-
-       private void handleMetaTaskTransationEnd(IMetaTaskTransaction trans, 
TaskConsumer taskConsumer) {
-               ServiceDriver sd = DuccServiceDriver.getInstance();
-               TaskAllocatorCallbackListener taskAllocator =
-                               sd.getTaskAllocator();
-               if ( trans.getMetaTask().getUserSpaceException() != null ) {
-                 // The service returns a stringified stacktrace ... not an 
Exception
-                       String exceptionAsString = (String) 
trans.getMetaTask().getUserSpaceException();
-                       taskAllocator.onTaskFailure( taskConsumer, 
trans.getMetaTask().getAppData(), exceptionAsString );
-
-               } else {
-                       String m =
-                                       
trans.getMetaTask().getPerformanceMetrics();
-                   if ( logger.isLoggable(Level.FINE)) {
-                       
logger.log(Level.FINE,"handleMetaTaskTransationEnd()........... 
appdata:"+trans.getMetaTask().getAppData());
-                   }
-                       taskAllocator.onTaskSuccess( 
taskConsumer,trans.getMetaTask().getAppData(), m );
-               }
-       }
+  Logger logger = 
UIMAFramework.getLogger(DuccServiceTaskProtocolHandler.class);
 
-       public static void main(String[] args) {
-               // TODO Auto-generated method stub
+  private volatile boolean running = true;
 
-       }
+  private static AtomicInteger atomicCounter = new AtomicInteger(0);
+
+  private static AtomicInteger nHandles = new AtomicInteger(0);
+
+  public DuccServiceTaskProtocolHandler(TaskAllocatorCallbackListener 
taskAllocator) {
+  }
+
+  @Override
+  public String initialize(Properties props) throws TaskProtocolException {
+    return null;
+  }
+
+  @Override
+  public void handle(IMetaTaskTransaction wi) throws TaskProtocolException {
+    handleMetaTaskTransation(wi);
+  }
+
+  private void handleMetaTaskTransation(IMetaTaskTransaction trans) {
+    try {
+      trans.setResponseHints(new ArrayList<Hint>());
+
+      TaskConsumer taskConsumer = new WiTaskConsumer(trans);
+
+      MessageBuffer mb = new MessageBuffer();
+      mb.append(Standardize.Label.remote.get() + taskConsumer.toString());
+      mb.append(Standardize.Label.type.get() + trans.getType());
+      Type type = trans.getType();
+      switch (type) {
+        case Get:
+          if (logger.isLoggable(Level.FINE)) {
+            logger.log(Level.FINE,
+                    "---- Driver handling GET Request - Requestor:" + 
taskConsumer.toString());
+          }
+          handleMetaTaskTransationGet(trans, taskConsumer);
+          break;
+        case Ack:
+          if (logger.isLoggable(Level.FINE)) {
+            logger.log(Level.FINE,
+                    "---- Driver handling ACK Request - Requestor:" + 
taskConsumer.toString());
+          }
+          handleMetaTaskTransationAck(trans, taskConsumer);
+          break;
+        case End:
+          if (logger.isLoggable(Level.FINE)) {
+            logger.log(Level.FINE,
+                    "---- Driver handling END Request - Requestor:" + 
taskConsumer.toString());
+          }
+          handleMetaTaskTransationEnd(trans, taskConsumer);
+          break;
+        case InvestmentReset:
+          // handleMetaCasTransationInvestmentReset(trans, rwt);
+          break;
+        default:
+          break;
+      }
+      IMetaTask metaCas = trans.getMetaTask();
+      if (metaCas != null) {
+        metaCas.setPerformanceMetrics(null);
+        metaCas.setUserSpaceException(null);
+      }
+    } catch (Exception e) {
+      logger.log(Level.WARNING, "Error", e);
+    } finally {
+    }
+  }
+
+  @Override
+  public String start() throws Exception {
+    running = true;
+    return null;
+  }
+
+  @Override
+  public void stop() throws Exception {
+    running = false;
+  }
+
+  private void handleMetaTaskTransationGet(IMetaTaskTransaction trans, 
TaskConsumer taskConsumer) {
+    IMetaMetaTask mmc = getMetaMetaTask(taskConsumer);
+    trans.setMetaTask(mmc.getMetaCas());
+  }
+
+  private IMetaTask getMetaTask(String serializedCas) {
+    if (serializedCas == null) {
+      return null;
+    }
+    return new MetaTask(atomicCounter.incrementAndGet(), "", serializedCas);
+  }
+
+  private IMetaMetaTask getMetaMetaTask(TaskConsumer taskConsumer) {
+
+    IMetaMetaTask mmc = new MetaMetaTask();
+    ServiceDriver sd = DuccServiceDriver.getInstance();
+    TaskAllocatorCallbackListener taskAllocator = sd.getTaskAllocator();
+    ITask task;
+
+    task = taskAllocator.getTask(taskConsumer);
+    if (task != null && !task.isEmpty()) {
+      IMetaTask metaTask = getMetaTask(task.asString());
+      mmc.setMetaCas(metaTask);
+      if (logger.isLoggable(Level.FINE)) {
+        logger.log(Level.FINE,
+                "Returning TASK with appdata:" + task.getMetadata() + " to the 
service");
+      }
+      mmc.getMetaCas().setAppData(task.getMetadata());
+    }
+    return mmc;
+  }
+
+  private void handleMetaTaskTransationAck(IMetaTaskTransaction trans, 
TaskConsumer taskConsumer) {
+
+  }
+
+  private void handleMetaTaskTransationEnd(IMetaTaskTransaction trans, 
TaskConsumer taskConsumer) {
+    ServiceDriver sd = DuccServiceDriver.getInstance();
+    TaskAllocatorCallbackListener taskAllocator = sd.getTaskAllocator();
+    if (trans.getMetaTask().getUserSpaceException() != null) {
+      // The service returns a stringified stacktrace ... not an Exception
+      String exceptionAsString = (String) 
trans.getMetaTask().getUserSpaceException();
+      taskAllocator.onTaskFailure(taskConsumer, 
trans.getMetaTask().getAppData(),
+              exceptionAsString);
+
+    } else {
+      String m = trans.getMetaTask().getPerformanceMetrics();
+      if (logger.isLoggable(Level.FINE)) {
+        logger.log(Level.FINE, "handleMetaTaskTransationEnd()........... 
appdata:"
+                + trans.getMetaTask().getAppData());
+      }
+      taskAllocator.onTaskSuccess(taskConsumer, 
trans.getMetaTask().getAppData(), m);
+    }
+  }
+
+  public static void main(String[] args) {
+    // TODO Auto-generated method stub
+
+  }
 
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/transport/HttpTaskTransportHandler.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/transport/HttpTaskTransportHandler.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/transport/HttpTaskTransportHandler.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/transport/HttpTaskTransportHandler.java
 Wed May  1 13:42:39 2019
@@ -22,6 +22,7 @@ package org.apache.uima.ducc.ps.sd.task.
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.util.HashMap;
 import java.util.Properties;
 
 import javax.servlet.ServletException;
@@ -47,218 +48,231 @@ import org.eclipse.jetty.servlet.Servlet
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 
+import com.thoughtworks.xstream.XStream;
+
 public class HttpTaskTransportHandler implements TaskTransportHandler {
-       Logger logger = UIMAFramework.getLogger(HttpTaskTransportHandler.class);
-       // Jetty
-       private Server server = null;
-       // Delegate to handle incoming messages
-       private TaskProtocolHandler taskProtocolHandler = null;
-    private volatile boolean running = false;
-    // mux is used to synchronize start()
-    private Object mux = new Object();
-    
-       public HttpTaskTransportHandler() {
-       }
-
-       public void setTaskProtocolHandler(TaskProtocolHandler 
taskProtocolHandler) {
-               this.taskProtocolHandler = taskProtocolHandler;
-       }
-
-       public String start() throws Exception {
-               synchronized( mux ) {
-                       if ( !running ) {
-                               if ( taskProtocolHandler == null ) {
-                                       throw new 
TaskProtocolException("start() called before initialize() - task protocol 
handler not started");
-                               }
-                               if ( server == null ) {
-                                       throw new 
TaskProtocolException("start() called before initialize() - Jetty not started 
yet");
-                               }
-
-                               server.start();
-                               logger.log(Level.INFO, "Jetty Started - Waiting 
for Messages ...");
-                               running = true;
-                       }
-               }
-               return "";
-       }
-
-       public void stop() throws Exception {
-               synchronized( mux ) {
-                       if ( server != null && server.isRunning() ) {
-                               server.stop();
-                       }
-               }
-       }
+  Logger logger = UIMAFramework.getLogger(HttpTaskTransportHandler.class);
+
+  // Jetty
+  private Server server = null;
+
+  // Delegate to handle incoming messages
+  private TaskProtocolHandler taskProtocolHandler = null;
+
+  private volatile boolean running = false;
+
+  // mux is used to synchronize start()
+  private Object mux = new Object();
+
+  // Create ThreadLocal Map containing instances of XStream for each thread
+  private ThreadLocal<HashMap<Long, XStream>> threadLocalXStream = new 
ThreadLocal<HashMap<Long, XStream>>() {
+    @Override
+    protected HashMap<Long, XStream> initialValue() {
+      return new HashMap<Long, XStream>();
+    }
+  };
+
+  public HttpTaskTransportHandler() {
+  }
 
-  public Server createServer(int httpPort, int maxThreads, String app, 
TaskProtocolHandler handler) 
+  public void setTaskProtocolHandler(TaskProtocolHandler taskProtocolHandler) {
+    this.taskProtocolHandler = taskProtocolHandler;
+  }
+
+  public String start() throws Exception {
+    synchronized (mux) {
+      if (!running) {
+        if (taskProtocolHandler == null) {
+          throw new TaskProtocolException(
+                  "start() called before initialize() - task protocol handler 
not started");
+        }
+        if (server == null) {
+          throw new TaskProtocolException(
+                  "start() called before initialize() - Jetty not started 
yet");
+        }
+
+        server.start();
+        logger.log(Level.INFO, "Jetty Started - Waiting for Messages ...");
+        running = true;
+      }
+    }
+    return "";
+  }
+
+  public void stop() throws Exception {
+    synchronized (mux) {
+      if (server != null && server.isRunning()) {
+        server.stop();
+      }
+    }
+  }
+
+  public Server createServer(int httpPort, int maxThreads, String app, 
TaskProtocolHandler handler)
           throws Exception {
 
-               // Server thread pool
-               QueuedThreadPool threadPool = new QueuedThreadPool();
-               if (maxThreads < threadPool.getMinThreads()) {
-                       // logger.warn("JobDriver", jobid,
-                       // "Invalid value for jetty MaxThreads("+maxThreads+") 
- it should be greater or equal to "+threadPool.getMinThreads()+". Defaulting 
to jettyMaxThreads="+threadPool.getMaxThreads());
-                       threadPool.setMaxThreads(threadPool.getMinThreads());
-               } else {
-                       threadPool.setMaxThreads(maxThreads);
-               }
-
-               Server server = new Server(threadPool);
-
-               // Server connector
-               ServerConnector connector = new ServerConnector(server);
-               connector.setPort(httpPort);
-               server.setConnectors(new Connector[] { connector });
-
-               ServletContextHandler context = new ServletContextHandler(
-                               ServletContextHandler.SESSIONS);
-               context.setContextPath("/");
-               server.setHandler(context);
-
-               context.addServlet(new ServletHolder(new 
TaskHandlerServlet(handler)), "/"+app);
-
-               return server;
-       }
-
-       public int findFreePort() {
-           ServerSocket socket = null;
-           int port = 0;
-           try {
-             //  by passing 0 as an arg, let ServerSocket choose an arbitrary
-             //  port that is available.
-             socket = new ServerSocket(0);
-             port = socket.getLocalPort();
-           } catch (IOException e) {
-           } finally { 
-             try {
-               // Clean up
-               if (socket != null) {
-                 socket.close(); 
-               } 
-             } catch( Exception ex) {
-                 ex.printStackTrace();
-             }
-           }
-           return port;
-         }
-       @Override
-       public String initialize(Properties properties) throws 
TaskTransportException {
-               // Max cores
+    // Server thread pool
+    QueuedThreadPool threadPool = new QueuedThreadPool();
+    if (maxThreads < threadPool.getMinThreads()) {
+      // logger.warn("JobDriver", jobid,
+      // "Invalid value for jetty MaxThreads("+maxThreads+") - it should be 
greater or equal to
+      // "+threadPool.getMinThreads()+". Defaulting to
+      // jettyMaxThreads="+threadPool.getMaxThreads());
+      threadPool.setMaxThreads(threadPool.getMinThreads());
+    } else {
+      threadPool.setMaxThreads(maxThreads);
+    }
+
+    Server server = new Server(threadPool);
+
+    // Server connector
+    ServerConnector connector = new ServerConnector(server);
+    connector.setPort(httpPort);
+    server.setConnectors(new Connector[] { connector });
+
+    ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
+    context.setContextPath("/");
+    server.setHandler(context);
+
+    context.addServlet(new ServletHolder(new TaskHandlerServlet(handler)), "/" 
+ app);
+
+    return server;
+  }
+
+  public int findFreePort() {
+    ServerSocket socket = null;
+    int port = 0;
+    try {
+      // by passing 0 as an arg, let ServerSocket choose an arbitrary
+      // port that is available.
+      socket = new ServerSocket(0);
+      port = socket.getLocalPort();
+    } catch (IOException e) {
+    } finally {
+      try {
+        // Clean up
+        if (socket != null) {
+          socket.close();
+        }
+      } catch (Exception ex) {
+        ex.printStackTrace();
+      }
+    }
+    return port;
+  }
+
+  @Override
+  public String initialize(Properties properties) throws 
TaskTransportException {
+    // Max cores
     int cores = Runtime.getRuntime().availableProcessors();
     String maxThreadsString = (String) 
properties.get(ServiceDriver.MaxThreads);
     String appName = (String) properties.get(ServiceDriver.Application);
 
-               int maxThreads = cores;
-               int httpPort = 0;
-               if (maxThreadsString != null) {
-                       try {
-                               maxThreads = 
Integer.parseInt(maxThreadsString.trim());
-                       } catch (NumberFormatException e) {
-                               logger.log(Level.WARNING,"Error",e);
-                       }
-               }
-               if (cores > maxThreads) {
-                       maxThreads = cores;
-               }       
-               
-               String portString = (String) properties.get(ServiceDriver.Port);
-               if (portString != null) {
-                       try {
-                               httpPort = Integer.parseInt(portString.trim());
-                       } catch (NumberFormatException e) {
-                               logger.log(Level.WARNING,"Error",e);
-                               throw new TaskTransportException("Unable to 
start Server using provided port:"+httpPort);
-                       }
-               } 
-    if (httpPort == 0) {     // Use any free port if none or 0 specified 
+    int maxThreads = cores;
+    int httpPort = 0;
+    if (maxThreadsString != null) {
+      try {
+        maxThreads = Integer.parseInt(maxThreadsString.trim());
+      } catch (NumberFormatException e) {
+        logger.log(Level.WARNING, "Error", e);
+      }
+    }
+    if (cores > maxThreads) {
+      maxThreads = cores;
+    }
+
+    String portString = (String) properties.get(ServiceDriver.Port);
+    if (portString != null) {
+      try {
+        httpPort = Integer.parseInt(portString.trim());
+      } catch (NumberFormatException e) {
+        logger.log(Level.WARNING, "Error", e);
+        throw new TaskTransportException("Unable to start Server using 
provided port:" + httpPort);
+      }
+    }
+    if (httpPort == 0) { // Use any free port if none or 0 specified
       httpPort = findFreePort();
     }
-               if (appName == null) {
-                 appName = "test";
-                 logger.log(Level.WARNING, "The "+ServiceDriver.Application+" 
property is not specified - using "+appName);
-               }
-               try {
-                       // create and initialize Jetty Server
-                       server = createServer(httpPort, maxThreads, appName, 
taskProtocolHandler);
-               } catch (Exception e) {
-                       throw new TaskTransportException(e);
-               }
-    
-               // Establish the URL we could register for our customers
+    if (appName == null) {
+      appName = "test";
+      logger.log(Level.WARNING,
+              "The " + ServiceDriver.Application + " property is not specified 
- using " + appName);
+    }
+    try {
+      // create and initialize Jetty Server
+      server = createServer(httpPort, maxThreads, appName, 
taskProtocolHandler);
+    } catch (Exception e) {
+      throw new TaskTransportException(e);
+    }
+
+    // Establish the URL we could register for our customers
     String taskUrl = server.getURI().toString();
     if (taskUrl.endsWith("/")) {
       taskUrl = taskUrl.substring(0, taskUrl.length() - 1);
     }
     taskUrl += ":" + httpPort + "/" + appName;
-    logger.log(Level.INFO, "Service Driver URL: " + taskUrl); 
-    
+    logger.log(Level.INFO, "Service Driver URL: " + taskUrl);
+
     return taskUrl;
-       }
+  }
+
+  public class TaskHandlerServlet extends HttpServlet {
+    private static final long serialVersionUID = 1L;
+
+    TaskProtocolHandler taskProtocolHandler = null;
 
+    public TaskHandlerServlet(TaskProtocolHandler handler) {
+      this.taskProtocolHandler = handler;
+    }
+
+    protected void doPost(HttpServletRequest request, HttpServletResponse 
response)
+            throws ServletException, IOException {
+      try {
+        StringBuilder sb = new StringBuilder();
+        BufferedReader reader = request.getReader();
+        String line;
+        while ((line = reader.readLine()) != null) {
+          sb.append(line);
+        }
+        String content = sb.toString().trim();
+        // check ThreadLocal for a Map entry for this thread id. If not found, 
create
+        // dedicated XStream instance for this thread which will be useed to 
serialize/deserialize
+        // this thread's tasks
+
+        if (threadLocalXStream.get().get(Thread.currentThread().getId()) == 
null) {
+          threadLocalXStream.get().put(Thread.currentThread().getId(),
+                  XStreamUtils.getXStreamInstance());
+        }
+
+        IMetaTaskTransaction imt = null;
+
+        // imt = (IMetaTaskTransaction) XStreamUtils.unmarshall(content);
+        // Use dedicated instance of XStream to deserialize request
+        imt = (IMetaTaskTransaction) 
threadLocalXStream.get().get(Thread.currentThread().getId())
+                .fromXML(content);
+
+        // process service request
+        taskProtocolHandler.handle(imt);
+
+        // setup reply
+        imt.setDirection(Direction.Response);
+
+        response.setStatus(HttpServletResponse.SC_OK);
+
+        response.setHeader("content-type", "text/xml");
+        // String body = XStreamUtils.marshall(imt);
+        // Use dedicated instance of XStream to serialize reply
+        String body = 
threadLocalXStream.get().get(Thread.currentThread().getId()).toXML(imt);
+        response.getWriter().write(body);
+
+      } catch (Throwable e) {
+        throw new ServletException(e);
+      }
+    }
 
+  }
 
-       public class TaskHandlerServlet extends HttpServlet {
-               private static final long serialVersionUID = 1L;
-               TaskProtocolHandler taskProtocolHandler = null;
-
-               public TaskHandlerServlet(TaskProtocolHandler handler) {
-                       this.taskProtocolHandler = handler;
-               }
-
-               protected void doPost(HttpServletRequest request,
-                               HttpServletResponse response) throws 
ServletException,
-                               IOException {
-                       try {
-                               //long post_stime = System.nanoTime();
-                               StringBuilder sb = new StringBuilder();
-                               BufferedReader reader = request.getReader();
-                               String line;
-                               while ((line = reader.readLine()) != null) {
-                                       sb.append(line);
-                               }
-                               // char[] content = new 
char[request.getContentLength()];
-                               String content = sb.toString().trim();
-
-                               // char[] content = new 
char[request.getContentLength()];
-
-                               // request.getReader().read(content);
-                               // logger.debug("doPost",jobid,
-                               // "Http Request 
Body:::"+String.valueOf(content));
-
-                               IMetaTaskTransaction imt = null;
-                               // String t = String.valueOf(content);
-
-                               // imt = (IMetaCasTransaction) XStreamUtils
-                               // .unmarshall(t.trim());
-                               imt = (IMetaTaskTransaction) 
XStreamUtils.unmarshall(content);
-                               //MessageHandler.accumulateTimes("Unmarshall", 
post_stime);
-
-                               // process service request
-                               taskProtocolHandler.handle(imt);
-
-                               //long marshall_stime = System.nanoTime();
-                               // setup reply
-                               imt.setDirection(Direction.Response);
-
-                               response.setStatus(HttpServletResponse.SC_OK);
-
-                               response.setHeader("content-type", "text/xml");
-                               String body = XStreamUtils.marshall(imt);
-
-                               response.getWriter().write(body);
-
-                               // When debugging accumulate times taken by 
each stage of the
-                               // message processing
-                       //      MessageHandler.accumulateTimes("Marshall", 
marshall_stime);
-                       //      MessageHandler.accumulateTimes("Post", 
post_stime);
-                       } catch (Throwable e) {
-                               throw new ServletException(e);
-                       }
-               }
-
-       }
-       public static void main(String[] args) {
-               // TODO Auto-generated method stub
+  public static void main(String[] args) {
+    // TODO Auto-generated method stub
 
-       }
+  }
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
 Wed May  1 13:42:39 2019
@@ -1,27 +1,26 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+/*                                                                             
                                                                                
                                      
+ * Licensed to the Apache Software Foundation (ASF) under one                  
                                                                                
                                      
+ * or more contributor license agreements.  See the NOTICE file                
                                                                                
                                      
+ * distributed with this work for additional information                       
                                                                                
                                      
+ * regarding copyright ownership.  The ASF licenses this file                  
                                                                                
                                      
+ * to you under the Apache License, Version 2.0 (the                           
                                                                                
                                      
+ * "License"); you may not use this file except in compliance                  
                                                                                
                                      
+ * with the License.  You may obtain a copy of the License at                  
                                                                                
                                      
+ *                                                                             
                                                                                
                                      
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
                                                                                
                                      
+ *                                                                             
                                                                                
                                      
+ * Unless required by applicable law or agreed to in writing,                  
                                                                                
                                      
+ * software distributed under the License is distributed on an                 
                                                                                
                                      
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY                      
                                                                                
                                      
+ * KIND, either express or implied.  See the License for the                   
                                                                                
                                      
+ * specific language governing permissions and limitations                     
                                                                                
                                      
+ * under the License.                                                          
                                                                                
                                      
 */
 package org.apache.uima.ducc.ps.service.protocol.builtin;
 
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
+import java.util.HashMap;
 import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -48,430 +47,502 @@ import org.apache.uima.ducc.ps.service.u
 import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
 
+import com.thoughtworks.xstream.XStream;
+
 /**
- *
+ * 
  * This protocol handler is a Runnable
- *
+ * 
  */
 public class DefaultServiceProtocolHandler implements IServiceProtocolHandler {
-       Logger logger = 
UIMAFramework.getLogger(DefaultServiceProtocolHandler.class);
-       private volatile boolean initError = false;
-       private volatile boolean running = false;
-       private volatile boolean quiescing = false;
-       private IServiceTransport transport;
-       private IServiceProcessor processor;
-       private INoTaskAvailableStrategy noTaskStrategy;
-       // each process thread will count down the latch after intialization
-       private CountDownLatch initLatch;
-       // this PH will count the stopLatch down when it is about to stop. The 
service
-       // is the owner of this latch and awaits termination blocking in start()
-       private CountDownLatch stopLatch;
-       // each process thread block on startLatch until application calls 
start()
-       private CountDownLatch startLatch;
-       // reference to a service so that stop() can be called
-       private IService service;
-       // forces process threads to initialize serially
-       private static ReentrantLock initLock = new ReentrantLock();
-
-       private static AtomicInteger idGenerator = new AtomicInteger();
-
-       private Thread retryThread = null;
-
-       private DefaultServiceProtocolHandler(Builder builder) {
-               this.initLatch = builder.initLatch;
-               this.stopLatch = builder.stopLatch;
-               this.service = builder.service;
-               this.transport = builder.transport;
-               this.processor = builder.processor;
-               this.noTaskStrategy = builder.strategy;
-       }
-
-       private void waitForAllThreadsToInitialize() {
-               try {
-                       initLatch.await();
-               } catch (InterruptedException e) {
-                       Thread.currentThread().interrupt();
-               }
-
-       }
-
-       private void initialize() throws ServiceInitializationException {
-               // this latch blocks all process threads after initialization
-               // until application calls start()
-               startLatch = new CountDownLatch(1);
-               try {
-                       // use a lock to serialize initialization one thread at 
a time
-                       initLock.lock();
-                       if (initError) {
-                               return;
-                       }
-                       processor.initialize();
-               } catch (Throwable e) {
-                       initError = true;
-                       running = false;
-                       e.printStackTrace();
-                       logger.log(Level.WARNING, "ProtocolHandler initialize() 
failed -",e);
-                       throw new ServiceInitializationException(
-                                       "Thread:" + 
Thread.currentThread().getName() + " Failed initialization - "+  e);
-               } finally {
-
-                       initLatch.countDown();
-                       initLock.unlock();
-                       if (!initError) {
-                               // wait on startLatch
-                               waitForAllThreadsToInitialize();
-                       }
-               }
-       }
-       public boolean initialized() {
-               return ( initError==false );
-       }
-       private IMetaTaskTransaction sendAndReceive(IMetaTaskTransaction 
transaction) throws Exception {
-               TransactionId tid;
-               if (Type.Get.equals(transaction.getType())) {
-                       int major = idGenerator.addAndGet(1);
-                       int minor = 0;
-
-                       tid = new TransactionId(major, minor);
-               } else {
-                       tid = transaction.getTransactionId();
-                       // increment minor
-                       tid.next();
-               }
-
-       transaction.setRequesterProcessName(service.getType());
-       transport.addRequestorInfo(transaction);
-       IMetaTaskTransaction reply = null;
-               try {
-                       // XStream is thread safe so multiple threads can 
serialize concurrently
-                       String body = XStreamUtils.marshall(transaction);
-                       // dispatch implements waiting if no task is given by 
the driver
-                       reply = transport.dispatch(body);
-
-                       if ( Objects.isNull(reply) ) {
-                               throw new TransportException("Received invalid 
content (null) in response from client - rejecting request");
-                       }
-
-               } catch ( Exception e) {
-                       if ( !running ) {
-                               throw new TransportException("Service stopping 
- rejecting request");
-                       }
-                       throw e;
-               }
-               return reply;
-       }
-
-       private IMetaTaskTransaction callEnd(IMetaTaskTransaction transaction) 
throws Exception {
-               transaction.setType(Type.End);
-               if ( logger.isLoggable(Level.FINE)) {
-                       logger.log(Level.FINE, "ProtocolHandler calling END");
-               }
-               return sendAndReceive(transaction);
-
-       }
-
-       private IMetaTaskTransaction callAck(IMetaTaskTransaction transaction) 
throws Exception {
-               transaction.setType(Type.Ack);
-               if ( logger.isLoggable(Level.FINE)) {
-                       logger.log(Level.FINE, "ProtocolHandler calling ACK");
-               }
-               return sendAndReceive(transaction);
-       }
-       /**
-        * Fetch new task from a remote driver. This method is synchronized to 
prevent overrunning the
-        * driver when a service scales up (many threads) and out (many 
instances). Only one thread
-        * at a time is allowed to pull tasks per service instance.
-        *
-        * When the driver is out of tasks, a single thread first sleeps for 
awhile and than tries
-        * again until a task is returned.
-        *
-        * @param transaction
-        * @return
-        * @throws Exception
-        */
-       private synchronized IMetaTaskTransaction callGet(IMetaTaskTransaction 
transaction) throws Exception {
-               transaction.setType(Type.Get);
-               if ( logger.isLoggable(Level.FINE)) {
-                       logger.log(Level.FINE, "ProtocolHandler calling GET");
-               }
-               IMetaTaskTransaction metaTransaction=null;
-               boolean logOutOfTasks = true;
-               while(running) {
-                       metaTransaction = sendAndReceive(transaction);
-                       // check if driver is out of tasks
-                       if ( metaTransaction.getMetaTask() == null || 
metaTransaction.getMetaTask().getUserSpaceTask() == null) {
-                               if ( logOutOfTasks ) {
-                                       if ( logger.isLoggable(Level.FINE)) {
-                                               logger.log(Level.FINE,"Process 
Thread:"+Thread.currentThread().getId()+" - Driver is out of tasks - waiting 
for awhile ("+noTaskStrategy.getWaitTimeInMillis()+" ms) and will try again ");
-                                       }
-                                       logOutOfTasks = false;
-                               }
-                               noTaskStrategy.handleNoTaskSupplied();
-                       } else {
-                               // Got a task
-                               break;
-                       }
-               }
-               return metaTransaction;
-       }
-       /**
-        * Block until service start() is called
-        *
-        * @throws ServiceInitializationException
-        */
-       private void awaitStart() throws ServiceInitializationException {
-               try {
-                       startLatch.await();
-               } catch(InterruptedException e ) {
-                       Thread.currentThread().interrupt();
-                       throw new ServiceInitializationException("Thread 
interrupted while awaiting start()");
-               }
-       }
-
-
-       public String call() throws ServiceInitializationException, 
ServiceException {
-               // we may fail in initialize() in which case the 
ServiceInitializationException
-               // is thrown
-               initialize();
-
-               // now wait for application to call start
-               awaitStart();
-
-               // all threads intialized, enter running state
-
-               IMetaTaskTransaction transaction = null;
-
-               if ( logger.isLoggable(Level.INFO)) {
-                       logger.log(Level.INFO, ".............. Thread 
"+Thread.currentThread().getId() + " ready to process");
-               }
-
-
-               while (running) {
-
-                       try {
-                               // send GET Request
-                               transaction = callGet(new 
MetaTaskTransaction());
-                               // the code may have blocked in callGet for 
awhile, so check
-                               // if service is still running. If this service 
is in quiescing
-                               // mode, finish processing current task. The 
while-loop will
-                               // terminate when the task is finished.
-                               if ( !running && !quiescing  ) {
-                                       break;
-                               }
-                               // transaction may be null if 
retryUntilSuccessfull was interrupted
-                               // due to stop
-                               if (Objects.isNull(transaction) || (!running  
&& !quiescing)) {
-                                       break;
-                               }
-                               logger.log(Level.FINE, ".............. Thread 
"+Thread.currentThread().getId() + " processing new task");
-                               if ( Objects.isNull(transaction.getMetaTask()) 
) {
-                                       // this should only be the case when 
the service is stopping and transport is shutdown
-                                   if ( running ) {
-                                       logger.log(Level.INFO, ".............. 
Thread "+Thread.currentThread().getId() + " GET returned null MetaTask while 
service is in a running state - this is unexpected");
-                                   }
-                                   // if !running, the while loop above will 
terminate
-                                   continue;
-                               }
-
-                               Object task = 
transaction.getMetaTask().getUserSpaceTask();
-
-                               // send ACK
-                               transaction = callAck(transaction);
-                               if (!running  && !quiescing ) {
-                                       break;
-                               }
-                               IProcessResult processResult = 
processor.process((String) task);
-
-                               // assume success
-                               Action action = Action.CONTINUE;
-                               // check if process error occurred.
-                               String errorAsString = processResult.getError();
-
-                               if (processResult.terminateProcess()) {
-                                       action = Action.TERMINATE;
-                               } else if ( Objects.isNull(errorAsString)){
-                                       // success
-                                       
transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
-                               }
-                               if ( Objects.nonNull(errorAsString ) ) {
-                                       IMetaTask mc = 
transaction.getMetaTask();
-                                       // the ducc.deploy.JpType is only 
present for jobs. If not specified
-                                       // we return stringified exception to 
the client. The JD expects
-                                       // Java Exception object for its error 
handling
-                                       if ( 
Objects.isNull(System.getProperty("ducc.deploy.JpType")) ) {
-
-                                               
mc.setUserSpaceException(errorAsString);
-                                       } else {
-                                               logger.log(Level.INFO, "Sending 
Exception to JD:\n" +
-                                                               
((Exception)processResult.getExceptionObject()));
-                                               // JD expects serialized 
exception as byte[]
-                                               
mc.setUserSpaceException(serializeError(processResult.getExceptionObject()));
-                                       }
-
-                               }
-
-                               // send END Request
-                               callEnd(transaction);
-                               if (running && Action.TERMINATE.equals(action)) 
{
-                                       logger.log(Level.WARNING, "Processor 
Failure - Action=Terminate");
-                                       // Can't stop using the current thread. 
This thread
-                                       // came from a thread pool we want to 
stop. Need
-                                       // a new/independent thread to call 
stop()
-                                       new Thread(new Runnable() {
-
-                                               @Override
-                                               public void run() {
-                                                       delegateStop();
-                                               }
-                                       }).start();
-                                       running = false;
-                               }
-
-
-
-                       } catch( IllegalStateException e) {
-                               break;
-                       } catch( TransportException e) {
-                               break;
-                       }
-                       catch (Exception e) {
-
-                               logger.log(Level.WARNING,"",e);
-                       }
-               }
-               stopLatch.countDown();
-               System.out.println(Utils.getTimestamp()+">>>>>>> 
"+Utils.getShortClassname(this.getClass())+".call() >>>>>>>>>> Thread 
["+Thread.currentThread().getId()+"] "+ " ProtocolHandler stopped requesting 
new tasks - Stopping processor");
-               logger.log(Level.INFO,"ProtocolHandler stopped requesting new 
tasks - Stopping processor");
-
-               if ( processor != null ) {
-                       processor.stop();
-               }
-               return String.valueOf(Thread.currentThread().getId());
-       }
-
-    private byte[] serializeError(Throwable t) throws Exception {
-               ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               ObjectOutputStream oos = new ObjectOutputStream(baos);
-
-               try {
-                       oos.writeObject(t);
-               } catch (Exception e) {
-                       try {
-                               logger.log(Level.WARNING, "Unable to Serialize 
"+t.getClass().getName()+" - Will Stringify It Instead");
-
-                       } catch( Exception ee) {}
-                       throw e;
-               } finally {
-                       oos.close();
-               }
-
-               return baos.toByteArray();
-       }
-       private void delegateStop() {
-          service.quiesceAndStop();
-       }
-       @Override
-       public void stop() {
-               quiescing = false;
-               running = false;
-               try {
-                       // use try catch to handle a possible race condition
-                       // when retryThread is not null, but it becomes null
-                       // before we call interrupt causing NPE. All this would
-                       // mean is that retryUntilSuccess() succeeded.
-                       if ( retryThread != null ) {
-                               retryThread.interrupt();
-                       }
-               } catch( Exception ee) {
-               }               //noTaskStrategy.interrupt();
-               if ( logger.isLoggable(Level.INFO)) {
-                       logger.log(Level.INFO, this.getClass().getName()+" 
stop() called");
-               }
-       }
-       @Override
-       public void quiesceAndStop() {
-
-               // Use System.out since the logger's ShutdownHook may have 
closed streams
-               System.out.println(Utils.getTimestamp()+">>>>>>> 
"+Utils.getShortClassname(this.getClass())+".queisceAndStop()");
-               logger.log(Level.INFO, this.getClass().getName()+" 
quiesceAndStop() called");
-               // change state of transport to not running but keep connection 
open
-               // so that other threads can quiesce (send results)
-               transport.stop(true);
-
-               quiescing = true;
-               running = false;
-               try {
-                       // use try catch to handle a possible race condition
-                       // when retryThread is not null, but it becomes null
-                       // before we call interrupt causing NPE. All this would
-                       // mean is that retryUntilSuccess() succeeded.
-                       if ( retryThread != null ) {
-                               retryThread.interrupt();
-                       }
-               } catch( Exception ee) {
-               }
-               try {
-                       // wait for process threads to terminate
-                       stopLatch.await();
-               } catch( Exception e ) {
-
-               }
-               // Use System.out since the logger's ShutdownHook may have 
closed streams
-               System.out.println(Utils.getTimestamp()+">>>>>>> 
"+Utils.getShortClassname(this.getClass())+".queisceAndStop() All process 
threads completed quiesce");
-               logger.log(Level.INFO, this.getClass().getName()+" All process 
threads completed quiesce");
-       }
-       @Override
-       public void start() {
-               running = true;
-               // process threads are initialized and are awaiting latch 
countdown
-               startLatch.countDown();
-       }
-       @Override
-       public void setServiceProcessor(IServiceProcessor processor) {
-               this.processor = processor;
-       }
-
-       @Override
-       public void setTransport(IServiceTransport transport) {
-               this.transport = transport;
-       }
-
-
-        public static class Builder {
-                       private IServiceTransport transport;
-                       private IServiceProcessor processor;
-                       private INoTaskAvailableStrategy strategy;
-                       // each thread will count down the latch
-                       private CountDownLatch initLatch;
-                       private CountDownLatch stopLatch;
-                       private IService service;
-
-                       public Builder withTransport(IServiceTransport 
transport) {
-                               this.transport = transport;
-                               return this;
-                       }
-                       public Builder withProcessor(IServiceProcessor 
processor) {
-                               this.processor = processor;
-                               return this;
-                       }
-                       public Builder withInitCompleteLatch(CountDownLatch 
initLatch) {
-                               this.initLatch = initLatch;
-                               return this;
-                       }
-                       public Builder withDoneLatch(CountDownLatch stopLatch) {
-                               this.stopLatch = stopLatch;
-                               return this;
-                       }
-                       public Builder 
withNoTaskStrategy(INoTaskAvailableStrategy strategy) {
-                               this.strategy = strategy;
-                               return this;
-                       }
-                       public Builder withService(IService service) {
-                               this.service = service;
-                               return this;
-                       }
-                       public DefaultServiceProtocolHandler build() {
-                   return new DefaultServiceProtocolHandler(this);
-               }
-        }
+  Logger logger = UIMAFramework.getLogger(DefaultServiceProtocolHandler.class);
+
+  private volatile boolean initError = false;
+
+  private volatile boolean running = false;
+
+  private volatile boolean quiescing = false;
+
+  private IServiceTransport transport;
+
+  private IServiceProcessor processor;
+
+  private INoTaskAvailableStrategy noTaskStrategy;
+
+  // each process thread will count down the latch after intialization
+  private CountDownLatch initLatch;
+
+  // this PH will count the stopLatch down when it is about to stop. The 
service
+  // is the owner of this latch and awaits termination blocking in start()
+  private CountDownLatch stopLatch;
+
+  // each process thread block on startLatch until application calls start()
+  private CountDownLatch startLatch;
+
+  // reference to a service so that stop() can be called
+  private IService service;
+
+  // forces process threads to initialize serially
+  private static ReentrantLock initLock = new ReentrantLock();
+
+  private ReentrantLock noWorkLock = new ReentrantLock();
+
+  private static AtomicInteger idGenerator = new AtomicInteger();
+
+  private Thread retryThread = null;
+
+  // Create ThreadLocal Map containing instances of XStream for each thread
+  private ThreadLocal<HashMap<Long, XStream>> threadLocalXStream = new 
ThreadLocal<HashMap<Long, XStream>>() {
+    @Override
+    protected HashMap<Long, XStream> initialValue() {
+      return new HashMap<>();
+    }
+  };
+
+  private DefaultServiceProtocolHandler(Builder builder) {
+    this.initLatch = builder.initLatch;
+    this.stopLatch = builder.stopLatch;
+    this.service = builder.service;
+    this.transport = builder.transport;
+    this.processor = builder.processor;
+    this.noTaskStrategy = builder.strategy;
+  }
+
+  private void waitForAllThreadsToInitialize() {
+    try {
+      initLatch.await();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+  }
+
+  private void initialize() throws ServiceInitializationException {
+    // this latch blocks all process threads after initialization
+    // until application calls start()
+    startLatch = new CountDownLatch(1);
+    try {
+      // use a lock to serialize initialization one thread at a time
+      initLock.lock();
+      if (initError) {
+        return;
+      }
+      processor.initialize();
+    } catch (Throwable e) {
+      initError = true;
+      running = false;
+      logger.log(Level.WARNING, "ProtocolHandler initialize() failed -", e);
+      throw new ServiceInitializationException(
+              "Thread:" + Thread.currentThread().getName() + " Failed 
initialization - " + e);
+    } finally {
+
+      initLatch.countDown();
+      initLock.unlock();
+      if (!initError) {
+        // wait on startLatch
+        waitForAllThreadsToInitialize();
+      }
+    }
+  }
+
+  public boolean initialized() {
+    return (initError == false);
+  }
+
+  private IMetaTaskTransaction sendAndReceive(IMetaTaskTransaction 
transaction) throws Exception {
+    TransactionId tid;
+    if (Type.Get.equals(transaction.getType())) {
+      int major = idGenerator.addAndGet(1);
+      int minor = 0;
+
+      tid = new TransactionId(major, minor);
+    } else {
+      tid = transaction.getTransactionId();
+      // increment minor
+      tid.next();
+    }
+
+    transaction.setRequesterProcessName(service.getType());
+    transport.addRequestorInfo(transaction);
+    IMetaTaskTransaction reply = null;
+    try {
+      // XStream is thread safe so multiple threads can serialize concurrently
+      // String body = XStreamUtils.marshall(transaction);
+      String body = 
threadLocalXStream.get().get(Thread.currentThread().getId()).toXML(transaction);
+      // dispatch implements waiting if no task is given by the driver
+      reply = transport.dispatch(body, threadLocalXStream);
+
+      if (Objects.isNull(reply)) {
+        throw new TransportException(
+                "Received invalid content (null) in response from client - 
rejecting request");
+      }
+
+    } catch (Exception e) {
+      if (!running) {
+        throw new TransportException("Service stopping - rejecting request");
+      }
+      throw e;
+    }
+    return reply;
+  }
+
+  private IMetaTaskTransaction callEnd(IMetaTaskTransaction transaction) 
throws Exception {
+    transaction.setType(Type.End);
+    if (logger.isLoggable(Level.FINE)) {
+      logger.log(Level.FINE, "ProtocolHandler calling END");
+    }
+    return sendAndReceive(transaction);
+
+  }
+
+  private IMetaTaskTransaction callAck(IMetaTaskTransaction transaction) 
throws Exception {
+    transaction.setType(Type.Ack);
+    if (logger.isLoggable(Level.FINE)) {
+      logger.log(Level.FINE, "ProtocolHandler calling ACK");
+    }
+    return sendAndReceive(transaction);
+  }
+
+  /**
+   * Fetch new task from a remote driver.
+   * 
+   * When the driver is out of tasks, a single thread first sleeps for awhile 
and than tries again
+   * until a task is returned.
+   * 
+   * @param transaction
+   * @return
+   * @throws Exception
+   */
+  private IMetaTaskTransaction callGet(IMetaTaskTransaction transaction) 
throws Exception {
+    transaction.setType(Type.Get);
+    if (logger.isLoggable(Level.FINE)) {
+      logger.log(Level.FINE, "ProtocolHandler calling GET");
+    }
+    IMetaTaskTransaction metaTransaction = null;
+
+    while (running) {
+      metaTransaction = sendAndReceive(transaction);
+      if (metaTransaction.getMetaTask() != null
+              && metaTransaction.getMetaTask().getUserSpaceTask() != null) {
+        return metaTransaction;
+      }
+
+      // If the first thread to get the lock poll for work and unlock when 
work found
+      // If don't immediately get the lock then wait for the lock to be 
released when
+      // work becomes available,
+      // and immediately release the lock and loop back to retry
+      boolean firstLocker = noWorkLock.tryLock();
+      if (!firstLocker) {
+        noWorkLock.lock();
+        noWorkLock.unlock();
+        continue;
+      }
+
+      // If the first one here hold the lock and sleep before retrying
+      if (logger.isLoggable(Level.INFO)) {
+        logger.log(Level.INFO, "Driver is out of tasks - waiting for "
+                + noTaskStrategy.getWaitTimeInMillis() + "ms before trying 
again ");
+      }
+      while (running) {
+        noTaskStrategy.handleNoTaskSupplied();
+        metaTransaction = sendAndReceive(transaction);
+        if (metaTransaction.getMetaTask() != null
+                && metaTransaction.getMetaTask().getUserSpaceTask() != null) {
+          noWorkLock.unlock();
+          return metaTransaction;
+        }
+      }
+    }
+    ;
+
+    return metaTransaction; // When shutting down
+  }
+
+  /**
+   * Block until service start() is called
+   * 
+   * @throws ServiceInitializationException
+   */
+  private void awaitStart() throws ServiceInitializationException {
+    try {
+      startLatch.await();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ServiceInitializationException("Thread interrupted while 
awaiting start()");
+    }
+  }
+
+  public String call() throws ServiceInitializationException, ServiceException 
{
+    // we may fail in initialize() in which case the 
ServiceInitializationException
+    // is thrown
+    initialize();
+
+    // now wait for application to call start
+    awaitStart();
+
+    // check ThreadLocal for a Map entry for this thread id. If not found, 
create
+    // dedicated XStream instance for this thread which will be useed to 
serialize/deserialize
+    // this thread's tasks
+    if (threadLocalXStream.get().get(Thread.currentThread().getId()) == null) {
+      threadLocalXStream.get().put(Thread.currentThread().getId(),
+              XStreamUtils.getXStreamInstance());// new XStream(new 
DomDriver()));
+    }
+    // all threads intialized, enter running state
+
+    IMetaTaskTransaction transaction = null;
+
+    if (logger.isLoggable(Level.INFO)) {
+      logger.log(Level.INFO,
+              ".............. Thread " + Thread.currentThread().getId() + " 
ready to process");
+    }
+
+    while (running) {
+
+      try {
+        // send GET Request
+        transaction = callGet(new MetaTaskTransaction());
+        // the code may have blocked in callGet for awhile, so check
+        // if service is still running. If this service is in quiescing
+        // mode, finish processing current task. The while-loop will
+        // terminate when the task is finished.
+        if (!running && !quiescing) {
+          break;
+        }
+        // transaction may be null if retryUntilSuccessfull was interrupted
+        // due to stop
+        if (Objects.isNull(transaction) || (!running && !quiescing)) {
+          break;
+        }
+        logger.log(Level.FINE,
+                ".............. Thread " + Thread.currentThread().getId() + " 
processing new task");
+        if (Objects.isNull(transaction.getMetaTask())) {
+          // this should only be the case when the service is stopping and 
transport is
+          // shutdown
+          if (running) {
+            logger.log(Level.INFO, ".............. Thread " + 
Thread.currentThread().getId()
+                    + " GET returned null MetaTask while service is in a 
running state - this is unexpected");
+          }
+          // if !running, the while loop above will terminate
+          continue;
+        }
+
+        Object task = transaction.getMetaTask().getUserSpaceTask();
+
+        // send ACK
+        transaction = callAck(transaction);
+        if (!running && !quiescing) {
+          break;
+        }
+        IProcessResult processResult = processor.process((String) task);
+
+        // assume success
+        Action action = Action.CONTINUE;
+        // check if process error occurred.
+        String errorAsString = processResult.getError();
+
+        if (processResult.terminateProcess()) {
+          action = Action.TERMINATE;
+        } else if (Objects.isNull(errorAsString)) {
+          // success
+          
transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
+        }
+        if (Objects.nonNull(errorAsString)) {
+          IMetaTask mc = transaction.getMetaTask();
+          // the ducc.deploy.JpType is only present for jobs. If not specified
+          // we return stringified exception to the client. The JD expects
+          // Java Exception object for its error handling
+          if (Objects.isNull(System.getProperty("ducc.deploy.JpType"))) {
+
+            mc.setUserSpaceException(errorAsString);
+          } else {
+            logger.log(Level.INFO, "Sending Exception to JD:\n"
+                    + ((Exception) processResult.getExceptionObject()));
+            // JD expects serialized exception as byte[]
+            
mc.setUserSpaceException(serializeError(processResult.getExceptionObject()));
+          }
+
+        }
+
+        // send END Request
+        callEnd(transaction);
+        if (running && Action.TERMINATE.equals(action)) {
+          logger.log(Level.WARNING, "Processor Failure - Action=Terminate");
+          // Can't stop using the current thread. This thread
+          // came from a thread pool we want to stop. Need
+          // a new/independent thread to call stop()
+          new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+              delegateStop();
+            }
+          }).start();
+          running = false;
+        }
+
+      } catch (IllegalStateException e) {
+        break;
+      } catch (TransportException e) {
+        break;
+      } catch (Exception e) {
+        logger.log(Level.WARNING, "", e);
+        break;
+      }
+    }
+    stopLatch.countDown();
+    System.out.println(Utils.getTimestamp() + ">>>>>>> " + 
Utils.getShortClassname(this.getClass())
+            + ".call() >>>>>>>>>> Thread [" + Thread.currentThread().getId() + 
"] "
+            + " ProtocolHandler stopped requesting new tasks - Stopping 
processor");
+    logger.log(Level.INFO, "ProtocolHandler stopped requesting new tasks - 
Stopping processor");
+
+    if (processor != null) {
+      processor.stop();
+    }
+    return String.valueOf(Thread.currentThread().getId());
+  }
+
+  private byte[] serializeError(Throwable t) throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+    try {
+      oos.writeObject(t);
+    } catch (Exception e) {
+      try {
+        logger.log(Level.WARNING,
+                "Unable to Serialize " + t.getClass().getName() + " - Will 
Stringify It Instead");
+
+      } catch (Exception ee) {
+      }
+      throw e;
+    } finally {
+      oos.close();
+    }
+
+    return baos.toByteArray();
+  }
+
+  private void delegateStop() {
+    service.quiesceAndStop();
+  }
+
+  @Override
+  public void stop() {
+    quiescing = false;
+    running = false;
+    try {
+      // use try catch to handle a possible race condition
+      // when retryThread is not null, but it becomes null
+      // before we call interrupt causing NPE. All this would
+      // mean is that retryUntilSuccess() succeeded.
+      if (retryThread != null) {
+        retryThread.interrupt();
+      }
+    } catch (Exception ee) {
+    } // noTaskStrategy.interrupt();
+    if (logger.isLoggable(Level.INFO)) {
+      logger.log(Level.INFO, this.getClass().getName() + " stop() called");
+    }
+  }
+
+  @Override
+  public void quiesceAndStop() {
+
+    // Use System.out since the logger's ShutdownHook may have closed streams
+    System.out.println(Utils.getTimestamp() + ">>>>>>> " + 
Utils.getShortClassname(this.getClass())
+            + ".queisceAndStop()");
+    logger.log(Level.INFO, this.getClass().getName() + " quiesceAndStop() 
called");
+    // change state of transport to not running but keep connection open
+    // so that other threads can quiesce (send results)
+    transport.stop(true);
+
+    quiescing = true;
+    running = false;
+    try {
+      // use try catch to handle a possible race condition
+      // when retryThread is not null, but it becomes null
+      // before we call interrupt causing NPE. All this would
+      // mean is that retryUntilSuccess() succeeded.
+      if (retryThread != null) {
+        retryThread.interrupt();
+      }
+    } catch (Exception ee) {
+    }
+    try {
+      // wait for process threads to terminate
+      stopLatch.await();
+    } catch (Exception e) {
+
+    }
+    // Use System.out since the logger's ShutdownHook may have closed streams
+    System.out.println(Utils.getTimestamp() + ">>>>>>> " + 
Utils.getShortClassname(this.getClass())
+            + ".queisceAndStop() All process threads completed quiesce");
+    logger.log(Level.INFO, this.getClass().getName() + " All process threads 
completed quiesce");
+  }
+
+  @Override
+  public void start() {
+    running = true;
+    // process threads are initialized and are awaiting latch countdown
+    startLatch.countDown();
+  }
+
+  @Override
+  public void setServiceProcessor(IServiceProcessor processor) {
+    this.processor = processor;
+  }
+
+  @Override
+  public void setTransport(IServiceTransport transport) {
+    this.transport = transport;
+  }
+
+  public static class Builder {
+    private IServiceTransport transport;
+
+    private IServiceProcessor processor;
+
+    private INoTaskAvailableStrategy strategy;
+
+    // each thread will count down the latch
+    private CountDownLatch initLatch;
+
+    private CountDownLatch stopLatch;
+
+    private IService service;
+
+    public Builder withTransport(IServiceTransport transport) {
+      this.transport = transport;
+      return this;
+    }
+
+    public Builder withProcessor(IServiceProcessor processor) {
+      this.processor = processor;
+      return this;
+    }
+
+    public Builder withInitCompleteLatch(CountDownLatch initLatch) {
+      this.initLatch = initLatch;
+      return this;
+    }
+
+    public Builder withDoneLatch(CountDownLatch stopLatch) {
+      this.stopLatch = stopLatch;
+      return this;
+    }
+
+    public Builder withNoTaskStrategy(INoTaskAvailableStrategy strategy) {
+      this.strategy = strategy;
+      return this;
+    }
+
+    public Builder withService(IService service) {
+      this.service = service;
+      return this;
+    }
+
+    public DefaultServiceProtocolHandler build() {
+      return new DefaultServiceProtocolHandler(this);
+    }
+  }
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
 Wed May  1 13:42:39 2019
@@ -18,14 +18,20 @@
 */
 package org.apache.uima.ducc.ps.service.transport;
 
+import java.util.HashMap;
+
 import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
 import org.apache.uima.ducc.ps.service.IServiceComponent;
 import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
 
+import com.thoughtworks.xstream.XStream;
+
 public interface IServiceTransport extends IServiceComponent {
        // called by Protocal Handler. Any errors will be handled
        // by instance of IServiceErrorHandler
-       public IMetaTaskTransaction dispatch(String request) throws 
TransportException;
+//     public IMetaTaskTransaction dispatch(String request) throws 
TransportException;
+       public IMetaTaskTransaction dispatch(String serializedRequest, 
ThreadLocal<HashMap<Long, XStream>> localXStream) throws TransportException;
+
        // initialize transport
        public void initialize() throws ServiceInitializationException; 
        // stop transport

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
 Wed May  1 13:42:39 2019
@@ -45,4 +45,9 @@ public class XStreamUtils {
                return xStream.fromXML(targetToUnmarshall);
         }
        }
+       public static XStream getXStreamInstance() {
+               XStream xStream = new XStream(new DomDriver());
+               initXStreanSecurity(xStream);
+               return xStream;
+       }
 }


Reply via email to