Added: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java?rev=1836115&view=auto
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
 (added)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
 Tue Jul 17 13:49:47 2018
@@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.processor.uima;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.lang.reflect.Method;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.ducc.ps.service.IScaleable;
+import org.apache.uima.ducc.ps.service.IServiceState;
+import org.apache.uima.ducc.ps.service.ServiceConfiguration;
+import org.apache.uima.ducc.ps.service.dgen.DeployableGenerator;
+import org.apache.uima.ducc.ps.service.dgen.DuccUimaReferenceByName;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
+import org.apache.uima.ducc.ps.service.monitor.builtin.RemoteStateObserver;
+import org.apache.uima.ducc.ps.service.processor.IProcessResult;
+import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
+import org.apache.uima.ducc.ps.service.processor.IServiceResultSerializer;
+import org.apache.uima.ducc.ps.service.processor.uima.utils.PerformanceMetrics;
+import 
org.apache.uima.ducc.ps.service.processor.uima.utils.UimaResultDefaultSerializer;
+import org.apache.uima.ducc.ps.service.utils.UimaSerializer;
+import org.apache.uima.ducc.ps.service.utils.Utils;
+import org.apache.uima.resource.metadata.impl.TypeSystemDescription_impl;
+import org.apache.uima.util.CasCreationUtils;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.Logger;
+
+public class UimaAsServiceProcessor extends AbstractServiceProcessor 
implements IServiceProcessor, IScaleable {
+       private static final Class<?> CLASS_NAME = UimaAsServiceProcessor.class;
+
+       Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
+       // Map to store DuccUimaSerializer instances. Each has affinity to a 
thread
+//     private Map<Long, UimaSerializer> serializerMap = new HashMap<>();
+       private static Object brokerInstance = null;
+       private UimaAsClientWrapper uimaASClient = null;
+       private String saxonURL = null;
+       private String xslTransform = null;
+       protected Object initializeMonitor = new Object();
+       private static volatile boolean brokerRunning = false;
+       private static final char FS = 
System.getProperty("file.separator").charAt(0);
+
+       private static final CountDownLatch brokerLatch = new CountDownLatch(1);
+       public static final String brokerPropertyName = "ducc.broker.name";
+       public static final String queuePropertyName = "ducc.queue.name";
+       public static final String duccNodeName = "DUCC_NODENAME";
+       private IServiceResultSerializer resultSerializer;
+       private static Class<?> classToLaunch = null;
+       private String aeName = "";
+       private int scaleout = 1;
+       private static Object platformMBeanServer;
+       private ServiceConfiguration serviceConfiguration;
+       private IServiceMonitor monitor;
+       public volatile boolean initialized = false;
+       private String[] deploymentDescriptors = null;
+       private String[] ids = null;
+       private String duccHome = null;
+       boolean enablePerformanceBreakdownReporting = false;
+       private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
+
+       static {
+               // try to get platform MBean Server (Java 1.5 only)
+               try {
+                       Class<?> managementFactory = 
Class.forName("java.lang.management.ManagementFactory");
+                       Method getPlatformMBeanServer = 
managementFactory.getMethod("getPlatformMBeanServer", new Class[0]);
+                       platformMBeanServer = 
getPlatformMBeanServer.invoke(null, (Object[]) null);
+               } catch (Exception e) {
+                       platformMBeanServer = null;
+               }
+       }
+       private String[] args;
+
+       public UimaAsServiceProcessor(String[] args, ServiceConfiguration 
serviceConfiguration) {
+               this.args = args;
+               this.serviceConfiguration = serviceConfiguration;
+               // start a thread which will collect AE initialization state
+               launchStateInitializationCollector();
+
+       }
+
+       @Override
+       public void setScaleout(int howManyThreads) {
+               scaleout = howManyThreads;
+       }
+
+       @Override
+       public int getScaleout() {
+               return scaleout;
+       }
+
+       private int generateDescriptorsAndGetScaleout(String[] args) throws 
Exception {
+               deploymentDescriptors = getDescriptors(args);
+               ids = new String[deploymentDescriptors.length];
+               return scaleout;
+       }
+
+       private void enableMetricsIfNewUimaAs() throws Exception{
+               // inner class hiding java reflection code to access uima-as 
class
+               UimaAsVersionWrapper uimaVersion = new UimaAsVersionWrapper();
+               
+               int majorVersion = uimaVersion.getMajor(); 
+               int minorVersion = uimaVersion.getMinor(); 
+               int buildRevision = uimaVersion.getRevision(); 
+
+               // enable performance breakdown reporting when support is added 
in the next UIMA
+               // AS release after 2.6.0
+               // (assumes the fix will be after the current 2.6.1-SNAPSHOT 
level)
+               if (majorVersion > 2 || (majorVersion == 2 && (minorVersion > 6 
|| (minorVersion == 6 && buildRevision > 1)))) {
+                       enablePerformanceBreakdownReporting = true;
+               }
+       }
+       private void launchStateInitializationCollector() {
+               monitor =
+                               new RemoteStateObserver(serviceConfiguration, 
logger);
+       }
+       @Override
+       public void initialize() throws ServiceInitializationException {
+               try {
+                       duccHome = System.getProperty("DUCC_HOME");
+                       String pid = Utils.getPID("Queue");
+                       String endpointName; // This is for the local 
within-process UIMA-AS client/server
+                       if (System.getenv(duccNodeName) != null) {
+                               endpointName = System.getenv(duccNodeName) + 
pid;
+                       } else {
+                               endpointName = 
InetAddress.getLocalHost().getCanonicalHostName() + pid;
+                       }
+                       // Needed to resolve ${queue.name} placeholder in DD 
generated by DUCC
+                       System.setProperty(queuePropertyName, endpointName);
+
+                       // generate Spring context file once
+                       synchronized (UimaAsServiceProcessor.class) {
+                               // every process thread has its own uima 
deserializer
+                               
serializerMap.put(Thread.currentThread().getId(), new UimaSerializer());
+                               if (!initialized) {
+                                       
monitor.onStateChange(IServiceState.State.Initializing.toString(), new 
Properties());
+
+                                       Properties duccProperties = 
loadDuccProperties();
+                                       String saxonPath = 
+                                                       
resolvePlaceholders(duccProperties.getProperty("ducc.uima-as.saxon.jar.path"), 
System.getProperties() ); 
+                                       String xsltPath = 
+                                                       
resolvePlaceholders(duccProperties.getProperty("ducc.uima-as.dd2spring.xsl.path"),
 System.getProperties() ); 
+                                       List<String> argList = new 
ArrayList<>();
+                                       argList.add("-d");
+                                       argList.add(args[0]);
+                                       argList.add("-saxonURL");
+                                       argList.add(saxonPath);
+                                       argList.add("-xslt");
+                                       argList.add(xsltPath);
+                                       
+                                       if ( 
System.getProperty("ducc.deploy.JpThreadCount") != null ) {
+                                               argList.add("-t");
+                                               
argList.add(System.getProperty("ducc.deploy.JpThreadCount"));
+                                       }
+                               
+                                       enableMetricsIfNewUimaAs();
+                                       resultSerializer = new 
UimaResultDefaultSerializer();
+                                       uimaASClient = new 
UimaAsClientWrapper(); 
+                                       scaleout = 
generateDescriptorsAndGetScaleout(argList.toArray(new String[argList.size()])); 
// Also converts the DD if necessary
+                                       if ( scaleout == 0 ) {
+                                               scaleout = 1;
+                                       }
+                                       initialized = true;
+                               }
+                               doDeploy();
+                       }
+                       if ( numberOfInitializedThreads.incrementAndGet() == 
scaleout ) {
+                               super.delay(logger, 30000);
+                               
monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
+                       }
+
+                       
+               } catch( Exception e) {
+                       logger.log(Level.WARNING, null, e);
+                       
monitor.onStateChange(IServiceState.State.FailedInitialization.toString(), new 
Properties());
+                       throw new ServiceInitializationException("",e);
+               }
+
+       }
+       public String resolvePlaceholders(String contents, Properties props ) 
+    {
+        //  Placeholders syntax ${<placeholder>}
+        Pattern placeHolderPattern = Pattern.compile("\\$\\{(.*?)\\}");
+      
+        java.util.regex.Matcher matcher = 
+            placeHolderPattern.matcher(contents); 
+
+        StringBuffer sb = new StringBuffer();
+        while (matcher.find()) {
+            // extract placeholder
+            final String key = matcher.group(1);
+            //  Find value for extracted placeholder. 
+            String placeholderValue = props.getProperty(key);
+            if (placeholderValue == null) {
+                placeholderValue = System.getProperty(key);
+                if (placeholderValue == null) {
+                    throw new IllegalArgumentException("Missing value for 
placeholder: " + key);
+                }
+            }
+            matcher.appendReplacement(sb, placeholderValue);        
+        }
+        matcher.appendTail(sb);
+        return sb.toString();
+       }
+       @Override
+       public IProcessResult process(String serializedTask) {
+               CAS cas = null;
+               IProcessResult result;
+               try {
+                       cas = uimaASClient.getCAS();
+                       // DUCC JP  services are given a serialized CAS ... 
others just the doc-text for a CAS
+                       if (serviceConfiguration.getJpType() != null) {
+                               // Use thread dedicated UimaSerializer to 
de-serialize the CAS
+                               
getUimaSerializer().deserializeCasFromXmi(serializedTask, cas);
+                       } else {
+                               cas.setDocumentText(serializedTask);
+                               cas.setDocumentLanguage("en");
+                       }
+
+                       List<PerformanceMetrics> casMetrics = new ArrayList<>();
+
+                       if (enablePerformanceBreakdownReporting) {
+                               List<?> perfMetrics = new ArrayList<>();
+
+                               try {
+                                       uimaASClient.sendAndReceive(cas, 
perfMetrics);
+                               } catch (Exception t) {
+                                       logger.log(Level.WARNING, "", t);
+                                       result = new UimaProcessResult(t, 
Action.TERMINATE);
+                                       return result;
+                               }
+
+                               for (Object metrics : perfMetrics) {
+                                       Method nameMethod = 
metrics.getClass().getDeclaredMethod("getName");
+                                       String name = (String) 
nameMethod.invoke(metrics);
+                                       Method uniqueNameMethod = 
metrics.getClass().getDeclaredMethod("getUniqueName");
+                                       String uniqueName = (String) 
uniqueNameMethod.invoke(metrics);
+                                       Method analysisTimeMethod = 
metrics.getClass().getDeclaredMethod("getAnalysisTime");
+                                       long analysisTime = (long) 
analysisTimeMethod.invoke(metrics);
+
+                                       boolean aggregate = 
uniqueName.startsWith("/" + name);
+                                       int pos = uniqueName.indexOf("/", 1);
+                                       if (pos > -1 && scaleout > 1 && name != 
null && aggregate) {
+                                               String st = 
uniqueName.substring(pos);
+                                               uniqueName = "/" + name + st;
+                                       }
+                                       PerformanceMetrics pm = new 
PerformanceMetrics(name, uniqueName, analysisTime);
+                                       casMetrics.add(pm);
+                               }
+                       } else {
+                               // delegate processing to the UIMA-AS service 
and wait for a reply
+                               try {
+                                       uimaASClient.sendAndReceive(cas);
+                               } catch (Exception t) {
+                                       logger.log(Level.WARNING, "", t);
+                                       result = new UimaProcessResult(t, 
Action.TERMINATE);
+                                       return result;
+                               }                                               
                                 
+                               PerformanceMetrics pm = new PerformanceMetrics(
+                                               "Performance Metrics Not 
Supported For DD Jobs and UIMA-AS <= v2.6.0",
+                                               "Performance Metrics Not 
Supported For DD Jobs and UIMA-AS <= v2.6.0 ", 0);
+                               casMetrics.add(pm);
+
+                       }
+                       return new 
UimaProcessResult(resultSerializer.serialize(casMetrics));
+
+               } catch (Exception t) {
+                       logger.log(Level.WARNING, "", t);
+                       result = new UimaProcessResult(t, Action.TERMINATE);
+                       return result;
+               } finally {
+                       if (cas != null) {
+                               cas.release();
+                       }
+               }
+       }
+       private void doDeploy() throws Exception {
+               // deploy singleUIMA-AS Version instance of embedded broker
+                       try {
+                               // below code runs once to create broker, 
uima-as client and
+                               // uima-as service
+                               if (brokerInstance == null) {
+                                       deployBroker(duccHome);
+                                       // Broker is running
+                                       brokerRunning = true;
+
+                                       int i = 0;
+                                       // Deploy UIMA-AS services
+                                       for (String dd : deploymentDescriptors) 
{
+                                               // Deploy UIMA-AS service. Keep 
the deployment id so
+                                               // that we can undeploy uima-as 
service on stop.
+                                               ids[i] = 
uimaASClient.deployService(dd);
+                                       }
+                                       // send GetMeta to UIMA-AS service and 
wait for a reply
+                                       uimaASClient.initialize(); 
+                               }
+
+                       } catch (Throwable e) {
+                               logger.log(Level.WARNING, 
"UimaAsServiceProcesser", e);
+                               throw new RuntimeException(e);
+
+                       }
+       }
+       private Properties loadDuccProperties() throws Exception {
+               Properties duccProperties = new Properties();
+               duccProperties.load(new 
FileInputStream(System.getProperty("ducc.deploy.configuration")));
+               return duccProperties;
+       }
+       /**
+        * Extract descriptors from arg list. Also extract xsl processor and 
saxon url.
+        * Parse the DD to fetch scaleout property & convert the DD if 
necessary.
+        *
+        * @param args
+        *            - java argument list
+        * @return - an array of DDs
+        *
+        * @throws Exception
+        */
+       private String[] getDescriptors(String[] args) throws Exception {
+               int nbrOfArgs = args.length;
+               String[] deploymentDescriptors = Utils.getMultipleArg("-d", 
args);
+               if (deploymentDescriptors.length == 0) {
+                       // allow multiple args for one key
+                       deploymentDescriptors = Utils.getMultipleArg2("-dd", 
args);
+               }
+               for( String arg : args) {
+                       logger.log(Level.INFO,"+++++++++++++++ arg:"+arg);
+               }
+               saxonURL = Utils.getArg("-saxonURL", args);
+               xslTransform = Utils.getArg("-xslt", args);
+               String threadCount = Utils.getArg("-t", args); // Will be null 
if ducc.deploy.JpThreadCount is undefined
+               if ( threadCount.isEmpty()) {
+                       threadCount = "1";  // default
+               }
+               if (nbrOfArgs < 1 || (deploymentDescriptors.length == 0 || 
saxonURL.equals("") || xslTransform.equals(""))) {
+                       printUsageMessage();
+                       return null; // Done here
+               }
+               deploymentDescriptors[0] = parseDD(deploymentDescriptors[0], 
threadCount);
+               return deploymentDescriptors;
+       }
+
+       /**
+        * Parses given Deployment Descriptor to extract scaleout and to check 
if it has
+        * placeholders for the queue & broker. Generates a converted one if 
necessary,
+        * e.g. for "pull services. The scaleout is used for "pull" services 
and should
+        * become the default for DD jobs.
+        * 
+        * @param ddPath
+        *            - path to the DD
+        * @param threadCount
+        *            - pipeline scaleout value null if undefined)
+        * @return original or converted DD
+        * @throws Exception
+        */
+       public String parseDD(String ddPath, String threadCount) throws 
Exception {
+               // For "custom" pull-services must convert the DD so it can be 
deployed locally
+               String logfile=System.getenv("DUCC_PROCESS_LOG_PREFIX");
+               // For JPs this should return the same file, already converted 
by the JD
+               String logDir = new File(logfile).getParent();
+
+               DeployableGenerator deployableGenerator = new 
DeployableGenerator(logDir);
+           DuccUimaReferenceByName configuration = new 
DuccUimaReferenceByName(0, ddPath);
+           String ddNew = deployableGenerator.generateDd(configuration, 
System.getenv("DUCC_JOBID"), true);
+               
+               if (ddNew != ddPath) {
+                       logger.log(Level.INFO, "Generated " + ddNew + " from " 
+ ddPath);
+               }
+
+               int ddScaleout = deployableGenerator.getScaleout();
+               if (threadCount == null) {
+                       scaleout = ddScaleout;
+                       logger.log(Level.INFO, "DD specifies a scaleout of " + 
scaleout);
+               } else {
+                       scaleout = Integer.parseInt(threadCount);
+                       if (ddScaleout != scaleout) {
+                               logger.log(Level.WARNING,
+                                               "Scaleout specified as " + 
scaleout + " but the DD is configured for " + ddScaleout);
+                       }
+               }
+
+               return ddNew;
+       }
+
+       @Override
+       public void stop() {
+               synchronized (UimaAsServiceProcessor.class) {
+                       if (brokerRunning) {
+                               logger.log(Level.INFO, "Stopping UIMA_AS 
Client");
+                               try {
+                                       // Prevent UIMA-AS from exiting
+                                       System.setProperty("dontKill", "true");
+                                       uimaASClient.stop();
+
+                                       Method brokerStopMethod = 
classToLaunch.getMethod("stop");
+                                       brokerStopMethod.invoke(brokerInstance);
+
+                                       Method waitMethod = 
classToLaunch.getMethod("waitUntilStopped");
+                                       waitMethod.invoke(brokerInstance);
+                                       brokerRunning = false;
+
+                               } catch (Exception e) {
+                                       logger.log(Level.WARNING, "stop", e);
+                               }
+
+                       }
+               }
+
+       }
+
+       private void deployBroker(String duccHome) throws Exception {
+               // Save current context class loader. When done loading the 
broker jars
+               // this class loader will be restored
+               ClassLoader currentCL = 
Thread.currentThread().getContextClassLoader();
+               HashMap<String, String> savedPropsMap = null;
+
+               try {
+                       // setup a classpath for Ducc broker
+                       String[] brokerClasspath = new String[] {
+                                       duccHome + File.separator + 
"apache-uima" + File.separator + "apache-activemq" + File.separator
+                                                       + "lib" + 
File.separator + "*",
+                                       duccHome + File.separator + 
"apache-uima" + File.separator + "apache-activemq" + File.separator
+                                                       + "lib" + 
File.separator + "optional" + File.separator + "*" };
+
+                       // isolate broker in its own Class loader
+                       URLClassLoader ucl = Utils.create(brokerClasspath);
+                       Thread.currentThread().setContextClassLoader(ucl);
+                       savedPropsMap = Utils.hideLoggingProperties(); // 
Ensure DUCC doesn't try to use the user's logging setup
+
+                       classToLaunch = 
ucl.loadClass("org.apache.activemq.broker.BrokerService");
+                       if (System.getProperty("ducc.debug") != null) {
+                               Utils.dump(ucl, 4);
+                       }
+                       brokerInstance = classToLaunch.newInstance();
+
+                       Method setDedicatedTaskRunnerMethod = 
classToLaunch.getMethod("setDedicatedTaskRunner", boolean.class);
+                       setDedicatedTaskRunnerMethod.invoke(brokerInstance, 
false);
+
+                       Method setPersistentMethod = 
classToLaunch.getMethod("setPersistent", boolean.class);
+                       setPersistentMethod.invoke(brokerInstance, false);
+
+                       int port = 61626; // try to start the colocated broker 
with this port first
+                       String brokerURL = "tcp://localhost:";
+                       // loop until a valid port is found for the broker
+                       while (true) {
+                               try {
+                                       Method addConnectorMethod = 
classToLaunch.getMethod("addConnector", String.class);
+                                       
addConnectorMethod.invoke(brokerInstance, brokerURL + port);
+
+                                       Method startMethod = 
classToLaunch.getMethod("start");
+                                       startMethod.invoke(brokerInstance);
+
+                                       Method waitUntilStartedMethod = 
classToLaunch.getMethod("waitUntilStarted");
+                                       
waitUntilStartedMethod.invoke(brokerInstance);
+                                       System.setProperty("DefaultBrokerURL", 
brokerURL + port);
+                                       System.setProperty("BrokerURI", 
brokerURL + port);
+                                       // Needed to resolve ${broker.name} 
placeholder in DD generated by DUCC
+                                       System.setProperty(brokerPropertyName, 
brokerURL + port);
+
+                                       break; // got a valid port for the 
broker
+                               } catch (Exception e) {
+                                       if (isBindException(e)) {
+                                               port++;
+                                       } else {
+                                               throw new RuntimeException(e);
+                                       }
+                               }
+                       }
+
+               } catch (Exception e) {
+                       throw e;
+               } finally {
+                       // restore context class loader
+                       Thread.currentThread().setContextClassLoader(currentCL);
+                       brokerLatch.countDown();
+                       Utils.restoreLoggingProperties(savedPropsMap); // May 
not be necessary as user's logger has been established
+               }
+
+       }
+
+       private boolean isBindException(Throwable e) {
+               if (e == null) {
+                       return false;
+               }
+
+               if (e instanceof BindException) {
+                       return true;
+               } else if (e instanceof SocketException && "Address already in 
use".equals(e.getMessage())) {
+                       return true;
+               } else if (e.getCause() != null) {
+                       return isBindException(e.getCause());
+               } else {
+                       return false;
+               }
+       }
+
+       private static void printUsageMessage() {
+               System.out.println(" Arguments to the program are as follows : 
\n"
+                               + "-d path-to-UIMA-Deployment-Descriptor [-d 
path-to-UIMA-Deployment-Descriptor ...] \n"
+                               + "-saxon path-to-saxon.jar \n" + "-q top level 
service queue name \n"
+                               + "-xslt path-to-dd2spring-xslt\n" + "   or\n"
+                               + "path to Spring XML Configuration File which 
is the output of running dd2spring\n");
+       }
+       public static void main(String[] args) {
+               try {
+                       UimaAsServiceProcessor processor = 
+                                       new UimaAsServiceProcessor(args, null);
+                       processor.initialize();
+                       CAS cas = CasCreationUtils.createCas(new 
TypeSystemDescription_impl(), null, null);
+                       cas.setDocumentLanguage("en");
+                       cas.setDocumentText("Test");
+                       UimaSerializer serializer = 
+                                       new UimaSerializer();
+                       String serializedCas = 
serializer.serializeCasToXmi(cas);
+
+                       IProcessResult result =
+                                       processor.process(serializedCas);
+                       System.out.println("Client Received Result - 
Success:"+(result.getResult()!=null));
+                       processor.stop();
+               } catch( Exception e) {
+                       e.printStackTrace();
+               }
+               
+       }
+       private class UimaAsClientWrapper {
+               private Object uimaASClient;
+               private Class<?> clientClz;
+
+               public UimaAsClientWrapper() throws Exception {
+                       clientClz = Class
+                                       
.forName("org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl");
+                       uimaASClient = clientClz.newInstance();
+                       
+               }
+
+               public String deployService(String aDeploymentDescriptorPath) 
throws Exception {
+
+                       Map<String, Object> appCtx = new HashMap<>();
+
+                       Class<?> clz = 
Class.forName("org.apache.uima.aae.client.UimaAsynchronousEngine");
+
+                       appCtx.put((String) 
clz.getField("DD2SpringXsltFilePath").get(uimaASClient), 
xslTransform.replace('/', FS));
+                       appCtx.put((String) 
clz.getField("SaxonClasspath").get(uimaASClient), saxonURL.replace('/', FS));
+                       appCtx.put((String) 
clz.getField("CasPoolSize").get(uimaASClient), scaleout);
+
+                       String containerId = null;
+                       // use UIMA-AS client to deploy the service using 
provided
+                       // Deployment Descriptor
+                       ClassLoader duccCl = 
Thread.currentThread().getContextClassLoader();
+                       ClassLoader cl = this.getClass().getClassLoader();
+                       Thread.currentThread().setContextClassLoader(cl);
+                       Method deployMethod = 
uimaASClient.getClass().getDeclaredMethod("deploy", String.class, Map.class);
+                       containerId = (String) deployMethod.invoke(uimaASClient,
+                                       new Object[] { 
aDeploymentDescriptorPath, appCtx });
+                       Thread.currentThread().setContextClassLoader(duccCl);
+                       return containerId;
+               }
+
+               private void initialize() throws Exception {
+
+                       String endpoint = System.getProperty(queuePropertyName);
+                       String brokerURL = 
System.getProperty(brokerPropertyName);
+                       Map<String, Object> appCtx = new HashMap<>();
+                       Class<?> clz = 
Class.forName("org.apache.uima.aae.client.UimaAsynchronousEngine");
+
+                       appCtx.put((String) 
clz.getField("ServerUri").get(uimaASClient), brokerURL);
+                       appCtx.put((String) 
clz.getField("ENDPOINT").get(uimaASClient), endpoint);
+                       appCtx.put((String) 
clz.getField("CasPoolSize").get(uimaASClient), scaleout);
+                       appCtx.put((String) 
clz.getField("Timeout").get(uimaASClient), 0);
+                       appCtx.put((String) 
clz.getField("GetMetaTimeout").get(uimaASClient), 0);
+                       appCtx.put((String) 
clz.getField("CpcTimeout").get(uimaASClient), 1100);
+                       Method initMethod = 
uimaASClient.getClass().getMethod("initialize", Map.class);
+                       initMethod.invoke(uimaASClient, new Object[] { appCtx 
});
+
+                       // blocks until the client initializes
+                       Method getMetaMethod = 
uimaASClient.getClass().getMethod("getMetaData");
+                       Object meta = getMetaMethod.invoke(uimaASClient);
+
+                       Method nameMethod = 
meta.getClass().getMethod("getName");
+                       aeName = (String) nameMethod.invoke(meta);
+               }
+               public CAS getCAS() throws Exception {
+                       Method getCasMethod = 
uimaASClient.getClass().getMethod("getCAS");
+                       return (CAS) getCasMethod.invoke(uimaASClient);
+               }
+               public void sendAndReceive(CAS cas, List<?> perfMetrics) throws 
Exception {
+                       Method sendMethod = 
uimaASClient.getClass().getMethod("sendAndReceiveCAS", CAS.class,
+                                       List.class);
+                       sendMethod.invoke(uimaASClient, new Object[] { cas, 
perfMetrics });
+               }
+               public void sendAndReceive(CAS cas) throws Exception {
+                       Method sendMethod = 
uimaASClient.getClass().getDeclaredMethod("sendAndReceiveCAS", CAS.class);
+                       sendMethod.invoke(uimaASClient, new Object[] { cas });
+               }
+               public void stop() throws Exception {
+                       Method clientStopMethod = 
uimaASClient.getClass().getDeclaredMethod("stop");
+                       clientStopMethod.invoke(uimaASClient);
+               }
+               
+
+       }
+       private class UimaAsVersionWrapper {
+               Class<?> clz = null;
+               Method m = null;
+
+               
+               public UimaAsVersionWrapper() throws Exception {
+                       clz = 
Class.forName("org.apache.uima.aae.UimaAsVersion");
+               }
+
+               public String getFullVersion() throws Exception {
+                       m = clz.getDeclaredMethod("getFullVersionString");
+                       return (String)m.invoke(null);
+               }
+               public int getMajor() throws Exception {
+                       Method majorVersionMethod = 
clz.getDeclaredMethod("getMajorVersion");
+                       return (short) majorVersionMethod.invoke(null);
+               }
+               public int getMinor() throws Exception {
+                       Method minorVersionMethod = 
clz.getDeclaredMethod("getMinorVersion");
+                       return (short) minorVersionMethod.invoke(null);
+               }
+               public int getRevision() throws Exception {
+                       Method buildRevisionMethod = 
clz.getDeclaredMethod("getBuildRevision");
+                       return (short) buildRevisionMethod.invoke(null);
+               }
+
+       }
+}

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
 Tue Jul 17 13:49:47 2018
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -56,18 +56,17 @@ import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
 import org.apache.uima.util.XMLInputSource;
 
-public class UimaServiceProcessor implements IServiceProcessor {
+public class UimaServiceProcessor extends AbstractServiceProcessor implements 
IServiceProcessor {
        public static final String IMPORT_BY_NAME_PREFIX = "*importByName:";
        Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
    // Map to store DuccUimaSerializer instances. Each has affinity to a thread
-       private Map<Long, UimaSerializer> serializerMap;
        private IServiceResultSerializer resultSerializer;
        // stores AE instance pinned to a thread
-       private ThreadLocal<AnalysisEngine> threadLocal =
+       private ThreadLocal<AnalysisEngine> threadLocal = 
                        new ThreadLocal<> ();
     private ReentrantLock initStateLock = new ReentrantLock();
     private boolean sendInitializingState = true;
-       private ResourceManager rm =
+       private ResourceManager rm = 
                        UIMAFramework.newDefaultResourceManager();;
     private CasPool casPool = null;
        private int scaleout=1;
@@ -78,7 +77,7 @@ public class UimaServiceProcessor implem
        private ServiceConfiguration serviceConfiguration;
        private IServiceMonitor monitor;
        private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
-
+       
        static {
                // try to get platform MBean Server (Java 1.5 only)
                try {
@@ -88,8 +87,8 @@ public class UimaServiceProcessor implem
                } catch (Exception e) {
                        platformMBeanServer = null;
                }
-       }
-
+       }       
+       
        public UimaServiceProcessor(String analysisEngineDescriptor) {
                this(analysisEngineDescriptor,  new 
UimaResultDefaultSerializer(), new ServiceConfiguration());
        }
@@ -106,7 +105,7 @@ public class UimaServiceProcessor implem
                  serializerMap = new HashMap<>();
                }
        }
-
+       
        private void launchStateInitializationCollector() {
                monitor =
                                new RemoteStateObserver(serviceConfiguration, 
logger);
@@ -120,9 +119,10 @@ public class UimaServiceProcessor implem
 
        @Override
        public void initialize() {
+
                if ( logger.isLoggable(Level.FINE)) {
                        logger.log(Level.FINE, "Process Thread:"+ 
Thread.currentThread().getName()+" Initializing AE");
-
+                       
                }
                try {
                        // multiple threads may call this method. Send 
initializing state once
@@ -132,7 +132,7 @@ public class UimaServiceProcessor implem
                                
monitor.onStateChange(IServiceState.State.Initializing.toString(), new 
Properties());
                        }
                } catch( Exception e) {
-
+                       
                } finally {
                        initStateLock.unlock();
                }
@@ -141,9 +141,9 @@ public class UimaServiceProcessor implem
            HashMap<String,Object> paramsMap = new HashMap<>();
         paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, rm);
            paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, 
platformMBeanServer);
-
+           
                try {
-
+               
                        XMLInputSource is =
                                        
UimaUtils.getXMLInputSource(analysisEngineDescriptor);
                        String aed = is.getURL().toString();
@@ -160,22 +160,23 @@ public class UimaServiceProcessor implem
                                
initializeCasPool(ae.getAnalysisEngineMetaData());
                        }
                        }
-
                        // every process thread has its own uima deserializer
                        if (serviceConfiguration.getJpType() != null) {
                          serializerMap.put(Thread.currentThread().getId(), new 
UimaSerializer());
                        }
 
                } catch (Exception e) {
+                       logger.log(Level.WARNING, null, e);
                        
monitor.onStateChange(IServiceState.State.FailedInitialization.toString(), new 
Properties());
                        throw new RuntimeException(e);
 
-               }
+               } 
                if ( logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, "Process Thread:"+ 
Thread.currentThread().getName()+" Done Initializing AE");
-
+                       
                }
                if ( numberOfInitializedThreads.incrementAndGet() == scaleout ) 
{
+                       super.delay(logger, 30000);
                        
monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
                }
        }
@@ -188,17 +189,17 @@ public class UimaServiceProcessor implem
                casPool = new CasPool(scaleout, analysisEngineMetadata, rm);
        }
 
-       private UimaSerializer getUimaSerializer() {
-               return serializerMap.get(Thread.currentThread().getId());
-       }
-
+//     private UimaSerializer getUimaSerializer() {
+//             
+//             return serializerMap.get(Thread.currentThread().getId());
+//     }
        @Override
        public IProcessResult process(String serializedTask) {
                AnalysisEngine ae = null;
 
                CAS cas = casPool.getCas();
                IProcessResult result;
-
+               
                try {
                  // DUCC JP  services are given a serialized CAS ... others 
just the doc-text for a CAS
                        if (serviceConfiguration.getJpType() != null) {
@@ -207,36 +208,27 @@ public class UimaServiceProcessor implem
                          cas.setDocumentText(serializedTask);
                          cas.setDocumentLanguage("en");
                        }
-
                        // check out AE instance pinned to this thread
                        ae = threadLocal.get();
                        // get AE metrics before calling process(). Needed for
                        // computing a delta
-                       List<PerformanceMetrics> beforeAnalysis =
+                       List<PerformanceMetrics> beforeAnalysis = 
                                        UimaMetricsGenerator.get(ae);
-
+                       
                        // *****************************************************
                        // PROCESS
                        // *****************************************************
                        ae.process(cas);
-
+                       
                        // *****************************************************
-                       // No exception in process() , fetch metrics
+                       // No exception in process() , fetch metrics 
                        // *****************************************************
-                       List<PerformanceMetrics> afterAnalysis =
+                       List<PerformanceMetrics> afterAnalysis = 
                                        UimaMetricsGenerator.get(ae);
 
                        // get the delta
-                       List<PerformanceMetrics> casMetrics =
+                       List<PerformanceMetrics> casMetrics = 
                                        UimaMetricsGenerator.getDelta( 
afterAnalysis, beforeAnalysis);
-
-//                     StringBuilder sb = new StringBuilder("{");
-//
-//                     for (AnalysisEnginePerformanceMetrics metrics : 
casMetrics) {
-//                             
sb.append(resultSerializer.serialize(metrics)).append("}");
-//                     }
-//                     sb.append("}");
-
                        return new 
UimaProcessResult(resultSerializer.serialize(casMetrics));
                } catch( Exception e ) {
                        logger.log(Level.WARNING,"",e);
@@ -244,14 +236,14 @@ public class UimaServiceProcessor implem
                        return result;
                }
                finally {
-
+                       
                        if (cas != null) {
                                casPool.releaseCas(cas);
                        }
                }
        }
 
-
+       
        public void setErrorHandler(IServiceErrorHandler errorHandler) {
 
        }
@@ -266,7 +258,7 @@ public class UimaServiceProcessor implem
                        }
                } catch( Exception e) {
                        logger.log(Level.WARNING, "stop", e);
-               }
+               } 
        }
        /*
          // Build just an AE from parts and return the filename
@@ -293,3 +285,4 @@ public class UimaServiceProcessor implem
          }
 */
 }
+

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
 Tue Jul 17 13:49:47 2018
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,8 +44,8 @@ import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
 
 /**
- *
- * This protocol handler is a Runnable
+ * 
+ * This protocol handler is a Runnable 
  *
  */
 public class DefaultServiceProtocolHandler implements IServiceProtocolHandler {
@@ -66,13 +66,13 @@ public class DefaultServiceProtocolHandl
        private IService service;
        // forces process threads to initialize serially
        private ReentrantLock initLock = new ReentrantLock();
-
+       
        private static AtomicInteger idGenerator = new AtomicInteger();
 
-
-       private DefaultServiceProtocolHandler(Builder builder) {
-               this.initLatch = builder.initLatch;
-               this.stopLatch = builder.stopLatch;
+       
+       private DefaultServiceProtocolHandler(Builder builder) { 
+               this.initLatch = builder.initLatch; 
+               this.stopLatch = builder.stopLatch; 
                this.service = builder.service;
                this.transport = builder.transport;
                this.processor = builder.processor;
@@ -100,11 +100,12 @@ public class DefaultServiceProtocolHandl
                        // use a lock to serialize initialization one thread at 
a time
                        initLock.lock();
                        processor.initialize();
-               } catch (Exception e) {
+               } catch (Throwable e) {
                        initError = true;
                        running = false;
+                       logger.log(Level.WARNING, "ProtocolHandler initialize() 
failed -",e);
                        throw new ServiceInitializationException(
-                                       "Thread:" + 
Thread.currentThread().getName() + " Failed initialization", e);
+                                       "Thread:" + 
Thread.currentThread().getName() + " Failed initialization");
                } finally {
 
                        initLatch.countDown();
@@ -138,7 +139,7 @@ public class DefaultServiceProtocolHandl
                                throw new TransportException("Received invalid 
content (null) in response from client - rejecting request");
                        }
                        o = XStreamUtils.unmarshall(content);
-
+                       
                } catch ( Exception e) {
                        if ( !running ) {
                                throw new TransportException("Service stopping 
- rejecting request");
@@ -171,7 +172,7 @@ public class DefaultServiceProtocolHandl
        }
 
        private IMetaTaskTransaction callGet(IMetaTaskTransaction transaction) 
throws Exception {
-               transaction.setType(Type.Get);
+               transaction.setType(Type.Get); 
                if ( logger.isLoggable(Level.FINE)) {
                        logger.log(Level.FINE, "ProtocolHandler calling GET");
                }
@@ -179,7 +180,7 @@ public class DefaultServiceProtocolHandl
        }
        /**
         * Block until service start() is called
-        *
+        * 
         * @throws ServiceInitializationException
         */
        private void awaitStart() throws ServiceInitializationException {
@@ -194,25 +195,19 @@ public class DefaultServiceProtocolHandl
                // we may fail in initialize() in which case the 
ServiceInitializationException
                // is thrown
                initialize();
-
+               
                // now wait for application to call start
                awaitStart();
-
+               
                // all threads intialized, enter running state
 
                IMetaTaskTransaction transaction = null;
-
+               
                if ( logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, ".............. Thread 
"+Thread.currentThread().getId() + " ready to process");
                }
 
-               logger.log(Level.INFO, "Wait for the initialized state to 
propagate to the SM " +
-                         "so any processing errors are not treates as 
initialization failures");
-               try {
-                       Thread.sleep(30000);
-               } catch (InterruptedException e1) {
-               };
-
+               
                while (running) {
 
                        try {
@@ -224,13 +219,13 @@ public class DefaultServiceProtocolHandl
                                        break;
                                }
                                if (transaction.getMetaTask() == null || 
transaction.getMetaTask().getUserSpaceTask() == null ) {
-                                       // the client has no tasks to give.
+                                       // the client has no tasks to give. 
                                        noTaskStrategy.handleNoTaskSupplied();
                                        continue;
                                }
                                Object task = 
transaction.getMetaTask().getUserSpaceTask();
-
-                               // send ACK
+                               
+                               // send ACK 
                                transaction = callAck(transaction);
                                if (!running) {
                                        break;
@@ -265,8 +260,8 @@ public class DefaultServiceProtocolHandl
                                        }).start();
                                        running = false;
                                }
-
-
+                                       
+                               
 
                        } catch( IllegalStateException e) {
                                break;
@@ -275,14 +270,17 @@ public class DefaultServiceProtocolHandl
                        }
                        catch (Exception e) {
                                logger.log(Level.WARNING,"",e);
-                       }
+                       }               
                }
                stopLatch.countDown();
+               if ( processor != null ) {
+                       processor.stop();
+               }
                logger.log(Level.INFO,"ProtocolHandler terminated");
                return String.valueOf(Thread.currentThread().getId());
        }
 
-
+       
        private void delegateStop() {
                service.stop();
        }
@@ -308,8 +306,8 @@ public class DefaultServiceProtocolHandl
        public void setTransport(IServiceTransport transport) {
                this.transport = transport;
        }
-
-
+       
+       
         public static class Builder {
                        private IServiceTransport transport;
                        private IServiceProcessor processor;
@@ -326,15 +324,15 @@ public class DefaultServiceProtocolHandl
                        public Builder withProcessor(IServiceProcessor 
processor) {
                                this.processor = processor;
                                return this;
-                       }
+                       }                       
                        public Builder withInitCompleteLatch(CountDownLatch 
initLatch) {
                                this.initLatch = initLatch;
                                return this;
-                       }
+                       }                       
                        public Builder withDoneLatch(CountDownLatch stopLatch) {
                                this.stopLatch = stopLatch;
                                return this;
-                       }
+                       }                       
                        public Builder 
withNoTaskStrategy(INoTaskAvailableStrategy strategy) {
                                this.strategy = strategy;
                                return this;

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
 Tue Jul 17 13:49:47 2018
@@ -1,21 +1,4 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
-*/
+
 package org.apache.uima.ducc.ps.service;
 
 import java.util.Timer;
@@ -39,6 +22,7 @@ public class JunitPullServiceTestCase ex
                int scaleout = 2;
                super.startJetty(false);  // don't block
                String analysisEngineDescriptor = "TestAAE";
+               System.setProperty("ducc.deploy.JpType", "uima");
                IServiceProcessor processor = new 
                                UimaServiceProcessor(analysisEngineDescriptor);
 
@@ -52,7 +36,7 @@ public class JunitPullServiceTestCase ex
                        service.initialize();
                        Timer fTimer = new Timer("testPullService Timer");
                        // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer), 5000);
+                       fTimer.schedule(new MyTimerTask(service, fTimer), 
35000);
                        
                        service.start();
 
@@ -81,7 +65,7 @@ public class JunitPullServiceTestCase ex
                        System.out.println("----------- Starting Service 
.....");
                        Timer fTimer = new Timer();
                        //after 10sec stop the service
-                       fTimer.schedule(new MyTimerTask(service, fTimer), 
10000);
+                       fTimer.schedule(new MyTimerTask(service, fTimer), 
40000);
 
                        service.start();
 
@@ -92,7 +76,7 @@ public class JunitPullServiceTestCase ex
                        throw e;
                }
        }
-       
+       /*
        @Test
        public void testPullServiceWithProcessFailure() throws Exception {
                int scaleout = 2;
@@ -112,7 +96,7 @@ public class JunitPullServiceTestCase ex
                        service.initialize();
                        Timer fTimer = new Timer("testPullService Timer");
                        // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer), 5000);
+                       fTimer.schedule(new MyTimerTask(service, fTimer), 
35000);
                        
                        service.start();
 
@@ -125,7 +109,7 @@ public class JunitPullServiceTestCase ex
                }
        }
        
-       /*
+       
        @Test
        public void testPullServiceBadClientURL() throws Exception {
                int scaleout = 2;

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
 Tue Jul 17 13:49:47 2018
@@ -37,6 +37,7 @@ public class JUnitServiceWrapperTestCase
                System.out.println("........... Monitor 
Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT"));
                super.startJetty(false);  // don't block
                String analysisEngineDescriptor = "TestAAE";
+               System.setProperty("ducc.deploy.JpType", "uima");
 
                String tasURL = "http://localhost:8080/test";;
                try {
@@ -49,7 +50,7 @@ public class JUnitServiceWrapperTestCase
 
                        Timer fTimer = new Timer("testPullService Timer");
                        // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer), 5000);
+                       fTimer.schedule(new MyTimerTask(service, fTimer), 
35000);
                                
                        service.initialize(new String[] 
{analysisEngineDescriptor});
 
@@ -65,7 +66,7 @@ public class JUnitServiceWrapperTestCase
 
                }
        }
-       
+       /*
        @Test
        public void testPullServiceWrapperWithProcessFailure() throws Exception 
{
                //int scaleout = 2;
@@ -105,6 +106,7 @@ public class JUnitServiceWrapperTestCase
                        System.getProperties().remove("ProcessFail");
                }
        }
+*/
        class MyTimerTask extends TimerTask {
                final ServiceWrapper service;
                final Timer fTimer;

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java
 Tue Jul 17 13:49:47 2018
@@ -33,7 +33,7 @@ import org.apache.uima.ducc.container.jd
 import org.apache.uima.ducc.container.jd.cas.CasManager;
 import org.apache.uima.ducc.container.jd.cas.CasManagerStats;
 import org.apache.uima.ducc.container.jd.mh.IMessageHandler;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
 import 
org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent;
 
 public class JobDriverComponent extends AbstractDuccComponent
@@ -126,7 +126,7 @@ implements IJobDriverComponent {
                return logger;
        }
        
-       public void handleJpRequest(IMetaCasTransaction metaCasTransaction) 
throws Exception {
+       public void handleJpRequest(IMetaTaskTransaction metaCasTransaction) 
throws Exception {
                String location = "handleJpRequest";
                try {
                        IMessageHandler mh = 
JobDriver.getInstance().getMessageHandler();

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java
 Tue Jul 17 13:49:47 2018
@@ -37,9 +37,9 @@ import org.apache.uima.ducc.common.utils
 import org.apache.uima.ducc.common.utils.XStreamUtils;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.container.jd.mh.MessageHandler;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction;
-import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Direction;
+import org.apache.uima.ducc.ps.net.impl.MetaTaskTransaction;
 import org.apache.uima.ducc.transport.DuccTransportConfiguration;
 import 
org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent;
 import org.apache.uima.ducc.transport.dispatcher.ProcessStateDispatcher;
@@ -154,8 +154,8 @@ import org.springframework.context.annot
                        }
                    public void process(Exchange exchange) throws Exception {
                        // Get the transaction object sent by the JP
-                       IMetaCasTransaction imt = 
-                                       
exchange.getIn().getBody(MetaCasTransaction.class);
+                       IMetaTaskTransaction imt = 
+                                       
exchange.getIn().getBody(MetaTaskTransaction.class);
                        
                        // process JP's request
                        jdc.handleJpRequest(imt);
@@ -259,12 +259,12 @@ import org.springframework.context.annot
                                        //request.getReader().read(content);
                                        logger.debug("doPost",jobid, "Http 
Request Body:::"+String.valueOf(content));
                                        
-                                       IMetaCasTransaction imt=null;
+                                       IMetaTaskTransaction imt=null;
                                        //String t = String.valueOf(content);
                                                
 //                                     imt = (IMetaCasTransaction) XStreamUtils
 //                                                                     
.unmarshall(t.trim());
-                                       imt = (IMetaCasTransaction) XStreamUtils
+                                       imt = (IMetaTaskTransaction) 
XStreamUtils
                                                        .unmarshall(content);
                                        
MessageHandler.accumulateTimes("Unmarshall", post_stime);
                                

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java
 Tue Jul 17 13:49:47 2018
@@ -19,8 +19,8 @@
 
 package org.apache.uima.ducc.transport.configuration.jd.iface;
 
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
 
 public interface IJobDriverComponent {
-       public void handleJpRequest(IMetaCasTransaction metaCasTransaction) 
throws Exception;
+       public void handleJpRequest(IMetaTaskTransaction metaCasTransaction) 
throws Exception;
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
 Tue Jul 17 13:49:47 2018
@@ -44,15 +44,15 @@ import org.apache.uima.ducc.common.IDucc
 import org.apache.uima.ducc.common.NodeIdentity;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.XStreamUtils;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type;
-import org.apache.uima.ducc.container.net.iface.IPerformanceMetrics;
-import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
-import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
-import org.apache.uima.ducc.container.net.impl.TransactionId;
 import org.apache.uima.ducc.container.sd.ServiceRegistry;
 import org.apache.uima.ducc.container.sd.ServiceRegistry_impl;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Direction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type;
+import org.apache.uima.ducc.ps.net.iface.IPerformanceMetrics;
+import org.apache.uima.ducc.ps.net.impl.MetaTaskTransaction;
+import org.apache.uima.ducc.ps.net.impl.PerformanceMetrics;
+import org.apache.uima.ducc.ps.net.impl.TransactionId;
 
 public class DuccHttpClient {
   private final static String REGISTERED_DRIVER = 
"ducc.deploy.registered.driver";
@@ -219,7 +219,7 @@ public class DuccHttpClient {
                return pn;
        }
        
-    private void addCommonHeaders( IMetaCasTransaction transaction ) {
+    private void addCommonHeaders( IMetaTaskTransaction transaction ) {
        String location = "addCommonHeaders";
        transaction.setRequesterAddress(getIP());
        transaction.setRequesterNodeName(getNodeName());
@@ -246,9 +246,9 @@ public class DuccHttpClient {
                
     }
 
-       public IMetaCasTransaction execute( IMetaCasTransaction transaction, 
HttpPost postMethod ) throws Exception {
+       public IMetaTaskTransaction execute( IMetaTaskTransaction transaction, 
HttpPost postMethod ) throws Exception {
                Exception lastError = null;
-               IMetaCasTransaction reply=null;
+               IMetaTaskTransaction reply=null;
 
                addCommonHeaders(transaction);
                transaction.setDirection(Direction.Request);
@@ -302,8 +302,8 @@ public class DuccHttpClient {
                                        logger.error("execute", null, 
"Thread:"+Thread.currentThread().getId()+" ERRR::Content causing 
error:"+content,ex);
                                        throw ex;
                                }
-                               if ( o instanceof IMetaCasTransaction) {
-                                       reply = (MetaCasTransaction)o;
+                               if ( o instanceof IMetaTaskTransaction) {
+                                       reply = (MetaTaskTransaction)o;
                                } else {
                                        throw new 
InvalidClassException("Expected IMetaCasTransaction - Instead Received 
"+o.getClass().getName());
                                }
@@ -325,7 +325,7 @@ public class DuccHttpClient {
                } 
        }
 
-       private HttpResponse retryUntilSuccessfull(IMetaCasTransaction 
transaction, HttpPost postMethod) throws Exception {
+       private HttpResponse retryUntilSuccessfull(IMetaTaskTransaction 
transaction, HttpPost postMethod) throws Exception {
         HttpResponse response=null;
                // Only one thread attempts recovery. Other threads will block 
here
                // until connection to the remote is restored. 
@@ -370,7 +370,7 @@ public class DuccHttpClient {
                        //client.setTimeout(30000);
                        client.initialize(args[0]);
                        int minor = 0;
-                       IMetaCasTransaction transaction = new 
MetaCasTransaction();
+                       IMetaTaskTransaction transaction = new 
MetaTaskTransaction();
                        AtomicInteger seq = new AtomicInteger(0);
                        TransactionId tid = new 
TransactionId(seq.incrementAndGet(), minor);
                        transaction.setTransactionId(tid);
@@ -382,7 +382,7 @@ public class DuccHttpClient {
                        // send a request to JD and wait for a reply
                transaction = client.execute(transaction, postMethod);
                // The JD may not provide a Work Item to process.
-               if ( transaction.getMetaCas()!= null) {
+               if ( transaction.getMetaTask()!= null) {
                        // Confirm receipt of the CAS. 
                                transaction.setType(Type.Ack);
                                //command = Type.Ack.name();
@@ -395,11 +395,11 @@ public class DuccHttpClient {
                        //command = Type.End.name();
                        tid = new TransactionId(seq.incrementAndGet(), minor++);
                        transaction.setTransactionId(tid);
-                       IPerformanceMetrics metricsWrapper =
-                                       new PerformanceMetrics();
+               //      IPerformanceMetrics metricsWrapper =
+               //                      new PerformanceMetrics();
                        
-                       metricsWrapper.set(Arrays.asList(new Properties()));
-                       
transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
+                       //metricsWrapper.set(Arrays.asList(new Properties()));
+                       
//transaction.getMetaTask().setPerformanceMetrics(metricsWrapper);
                        
                        transaction = client.execute(transaction, postMethod); 
 

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
 Tue Jul 17 13:49:47 2018
@@ -24,6 +24,7 @@ import java.io.ObjectOutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -33,12 +34,15 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.http.client.methods.HttpPost;
 import org.apache.uima.ducc.common.utils.DuccLogger;
-import org.apache.uima.ducc.container.net.iface.IMetaCas;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type;
-import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
-import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
-import org.apache.uima.ducc.container.net.impl.TransactionId;
+import org.apache.uima.ducc.ps.net.iface.IMetaTask;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type;
+import org.apache.uima.ducc.ps.net.impl.MetaTaskTransaction;
+
+import org.apache.uima.ducc.ps.net.impl.TransactionId;
+import org.apache.uima.ducc.ps.service.processor.IServiceResultSerializer;
+import org.apache.uima.ducc.ps.service.processor.uima.utils.PerformanceMetrics;
+import 
org.apache.uima.ducc.ps.service.processor.uima.utils.UimaResultDefaultSerializer;
 import 
org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
@@ -52,8 +56,8 @@ public class HttpWorkerThread implements
        private Object processorInstance = null;
     private static AtomicInteger IdGenerator =
                new AtomicInteger();
-    private Map<String, IMetaCasTransaction> transactionMap =
-               new ConcurrentHashMap<String, IMetaCasTransaction>();
+    private Map<String, IMetaTaskTransaction> transactionMap =
+               new ConcurrentHashMap<String, IMetaTaskTransaction>();
     static AtomicInteger maxFrameworkFailures;
     private int maxFrameworkErrors = 2;   // default
     // define what happens to this jvm when process() method fails.
@@ -63,7 +67,7 @@ public class HttpWorkerThread implements
     
        public HttpWorkerThread(JobProcessComponent component, DuccHttpClient 
httpClient,
                        Object processorInstance, CountDownLatch 
workerThreadCount,
-                       CountDownLatch threadReadyCount, Map<String, 
IMetaCasTransaction> transactionMap,
+                       CountDownLatch threadReadyCount, Map<String, 
IMetaTaskTransaction> transactionMap,
                        AtomicInteger maxFrameworkFailures ) {
                this.duccComponent = component;
                this.httpClient = httpClient;
@@ -85,10 +89,10 @@ public class HttpWorkerThread implements
                }
        }   
 
-       public IMetaCasTransaction getWork(HttpPost postMethod, int major, int 
minor) throws Exception {
+       public IMetaTaskTransaction getWork(HttpPost postMethod, int major, int 
minor) throws Exception {
                String command="";
 
-               IMetaCasTransaction transaction = new MetaCasTransaction();
+               IMetaTaskTransaction transaction = new MetaTaskTransaction();
                try {
                        TransactionId tid = new TransactionId(major, minor);
                        transaction.setTransactionId(tid);
@@ -102,16 +106,16 @@ public class HttpWorkerThread implements
                transaction = httpClient.execute(transaction, postMethod);
                //httpClient.testConnection();
                // The JD may not provide a Work Item to process.
-               if ( transaction != null && transaction.getMetaCas()!= null) {
-                               logger.info("run", 
null,"Thread:"+Thread.currentThread().getId()+" Recv'd 
WI:"+transaction.getMetaCas().getSystemKey());
+               if ( transaction != null && transaction.getMetaTask()!= null) {
+                               logger.info("run", 
null,"Thread:"+Thread.currentThread().getId()+" Recv'd 
WI:"+transaction.getMetaTask().getSystemKey());
                                // Confirm receipt of the CAS. 
                                transaction.setType(Type.Ack);
                                command = Type.Ack.name();
                                tid = new TransactionId(major, minor++);
                                transaction.setTransactionId(tid);
-                               logger.debug("run", 
null,"Thread:"+Thread.currentThread().getId()+" Sending ACK request - 
WI:"+transaction.getMetaCas().getSystemKey());
+                               logger.debug("run", 
null,"Thread:"+Thread.currentThread().getId()+" Sending ACK request - 
WI:"+transaction.getMetaTask().getSystemKey());
                                transaction = httpClient.execute(transaction, 
postMethod); 
-                               if ( transaction.getMetaCas() == null) {
+                               if ( transaction.getMetaTask() == null) {
                                        // this can be the case when a JD 
receives ACK late 
                                        logger.info("run", 
null,"Thread:"+Thread.currentThread().getId()+" ACK reply recv'd, however there 
is no MetaCas. The JD Cancelled the transaction");
                                } else {
@@ -179,7 +183,7 @@ public class HttpWorkerThread implements
                                // allow agent some time to process 
FailedInitialization event 
                                Thread.sleep(2000);
                        } catch( Exception e) {}
-                       System.exit(1);
+       //              System.exit(1);
                        /* *****************************************/
                        /* *****************************************/
                        /* *****************************************/
@@ -211,7 +215,7 @@ public class HttpWorkerThread implements
                        
                logger.info("HttpWorkerThread.run()", null, "Begin Processing 
Work Items - Thread Id:"+Thread.currentThread().getId());
                try {
-                       IMetaCasTransaction transaction=null;
+                       IMetaTaskTransaction transaction=null;
                        int major = 0;
                        int minor = 0;
                        // Enter process loop. Stop this thread on the first 
process error.
@@ -239,7 +243,7 @@ public class HttpWorkerThread implements
                                        // done. In such case, reduce frequency 
of Get requests
                                        // by sleeping in between Get's. 
Eventually the OR will 
                                        // deallocate this process and the 
thread will exit
-                                       if ( transaction.getMetaCas() == null 
|| transaction.getMetaCas().getUserSpaceCas() == null) {
+                                       if ( transaction.getMetaTask() == null 
|| transaction.getMetaTask().getUserSpaceTask() == null) {
                                          logger.info("run", null, "Client is 
out of work - will retry quietly 
every",duccComponent.getThreadSleepTime()/1000,"secs.");
                                        
                          // Retry at the start of this block as another thread 
may have just exited with work
@@ -247,7 +251,7 @@ public class HttpWorkerThread implements
                                                synchronized 
(HttpWorkerThread.class) {
                                                        
while(duccComponent.isRunning() ) {
                                                          transaction = 
getWork(postMethod, major, ++minor);
-                                                         if ( 
transaction.getMetaCas() != null && transaction.getMetaCas().getUserSpaceCas() 
!= null ) {
+                                                         if ( 
transaction.getMetaTask() != null && 
transaction.getMetaTask().getUserSpaceTask() != null ) {
                                                            logger.info("run", 
null,"Thread:"+Thread.currentThread().getId()+" work flow has restarted");
                                                            break;
                                                          }
@@ -273,7 +277,7 @@ public class HttpWorkerThread implements
                                                        // that key to allow us 
to look up original transaction so that
                                                        // we can send reset 
request to the JD.
                                                        String key = (String)
-                                                                       
getKeyMethod.invoke(processorInstance, 
transaction.getMetaCas().getUserSpaceCas());
+                                                                       
getKeyMethod.invoke(processorInstance, 
transaction.getMetaTask().getUserSpaceTask());
                                                        if ( key != null ) {
                                                                // add 
transaction under th
                                                                
transactionMap.put(key, transaction);
@@ -286,7 +290,7 @@ public class HttpWorkerThread implements
                                                        // using java 
reflection, call process to analyze the CAS. While 
                                                        // we are blocking, 
user code may issue investment reset asynchronously.
                                                         List<Properties> 
metrics = (List<Properties>)processMethod.
-                                                          
invoke(processorInstance, transaction.getMetaCas().getUserSpaceCas());
+                                                          
invoke(processorInstance, transaction.getMetaTask().getUserSpaceTask());
                                                        //    
***********************************
                                                        if ( key != null ) {
                                 // process ended we no longer expect 
investment reset from user
@@ -295,10 +299,29 @@ public class HttpWorkerThread implements
                                                        }
                                                        
                                    logger.debug("run", 
null,"Thread:"+Thread.currentThread().getId()+" process() completed");
-                                                       PerformanceMetrics 
metricsWrapper =
-                                                                       new 
PerformanceMetrics();
-                                                       
metricsWrapper.set(metrics);
-                                                       
transaction.getMetaCas().setPerformanceMetrics(metricsWrapper);
+                                                       //PerformanceMetrics 
metricsWrapper =
+                                                       //              new 
PerformanceMetrics();
+                                               //      
metricsWrapper.set(metrics);
+                                                       
IServiceResultSerializer deserializer =
+                                                                       new 
UimaResultDefaultSerializer();
+                                                       
+                                                       /*
+                                                                               
        p.setProperty("name", metrics.getName());
+                               p.setProperty("uniqueName", 
metrics.getUniqueName());
+                               p.setProperty("analysisTime",
+                                               
String.valueOf(metrics.getAnalysisTime()));
+                               p.setProperty("numProcessed",
+                                               
String.valueOf(metrics.getNumProcessed()));
+
+                                                        */
+                                                       
List<PerformanceMetrics> pmList = new ArrayList<PerformanceMetrics>();
+                                                       for( Properties p : 
metrics) {
+                                                               
PerformanceMetrics pm = 
+                                                                               
new PerformanceMetrics(p.getProperty("name"), p.getProperty("uniqueName"), 
Long.parseLong(p.getProperty("analysisTime")));
+                                                               pmList.add(pm);
+                                                       }
+                                                       
+                                                       
transaction.getMetaTask().setPerformanceMetrics(deserializer.serialize(pmList));
                                                        
                                                }  catch( 
InvocationTargetException ee) {
                                                        
@@ -315,7 +338,7 @@ public class HttpWorkerThread implements
                                                        if ( 
!duccComponent.isRunning() ) {
                                                                break;
                                                        }
-                                                       IMetaCas mc = 
transaction.getMetaCas();
+                                                       IMetaTask mc = 
transaction.getMetaTask();
                                                        //byte[] 
serializedException = null;
                                                        Method 
getLastSerializedErrorMethod = 
processorInstance.getClass().getDeclaredMethod("getLastSerializedError");
                                                        byte[] 
serializedException =
@@ -342,10 +365,10 @@ public class HttpWorkerThread implements
                                                        
transaction.getMetaCas().setUserSpaceException(baos.toByteArray());
                                                    */
                                                        logger.error("run", 
null, ee);
-                                                       
transaction.getMetaCas().setUserSpaceException(serializedException);            
                                                
+                                                       
transaction.getMetaTask().setUserSpaceException(serializedException);           
                                                
                                                }
                                                // Dont return serialized CAS 
to reduce the msg size
-                                               
transaction.getMetaCas().setUserSpaceCas(null);
+                                               
transaction.getMetaTask().setUserSpaceTask(null);
                                                transaction.setType(Type.End);
                                                //String command = 
Type.End.name();
                                                
@@ -369,7 +392,7 @@ public class HttpWorkerThread implements
 
                                                String wid = null;
                        try {
-                               wid = transaction.getMetaCas().getSystemKey();
+                               wid = transaction.getMetaTask().getSystemKey();
                        } catch( Exception e) {
                                
                        }

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
 Tue Jul 17 13:49:47 2018
@@ -43,8 +43,8 @@ import org.apache.uima.ducc.common.compo
 import org.apache.uima.ducc.common.container.FlagsHelper;
 import org.apache.uima.ducc.common.main.DuccService;
 import org.apache.uima.ducc.common.utils.DuccLogger;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
 public class JobProcessComponent extends AbstractDuccComponent 
@@ -67,8 +67,8 @@ implements IJobProcessor{
        Lock stateLock = new ReentrantLock();
        
     private volatile boolean uimaASJob=false;
-    Map<String, IMetaCasTransaction> transactionMap =
-               new ConcurrentHashMap<String, IMetaCasTransaction>();
+    Map<String, IMetaTaskTransaction> transactionMap =
+               new ConcurrentHashMap<String, IMetaTaskTransaction>();
     
     final static Lock lock = new ReentrantLock();;
     
@@ -164,10 +164,10 @@ implements IJobProcessor{
        public void resetInvestment(String key) throws Exception {
                if ( httpClient != null && transactionMap.containsKey(key) ) {
                        // Fetch a transaction object associated with a WI id 
(key)
-                       IMetaCasTransaction transaction = 
transactionMap.get(key);
+                       IMetaTaskTransaction transaction = 
transactionMap.get(key);
                        HttpPost postMethod = new 
HttpPost(httpClient.getJdUrl());
                        // Dont return serialized CAS to reduce the msg size
-                       transaction.getMetaCas().setUserSpaceCas(null);
+                       transaction.getMetaTask().setUserSpaceTask(null);
                        transaction.setType(Type.InvestmentReset);
                        
                        // Set request timeout
@@ -177,13 +177,13 @@ implements IJobProcessor{
                        // loaded into the user container from which this call 
originated.
                        while( isRunning() ) {
                        try {
-                               logger.info("resetInvestment", null, "User 
Requested Investment Reset - sending request to JD - 
WI:"+transaction.getMetaCas().getSystemKey()+" user key:"+key);
+                               logger.info("resetInvestment", null, "User 
Requested Investment Reset - sending request to JD - 
WI:"+transaction.getMetaTask().getSystemKey()+" user key:"+key);
                                httpClient.execute(transaction, postMethod);
                                break;
                        } catch(SocketTimeoutException  e) {
-                               logger.info("resetInvestment", null, "Timeout 
while waiting for Investment Reset response from JD - retrying - 
WI:"+transaction.getMetaCas().getSystemKey());
+                               logger.info("resetInvestment", null, "Timeout 
while waiting for Investment Reset response from JD - retrying - 
WI:"+transaction.getMetaTask().getSystemKey());
                        } catch(Exception e) {
-                               logger.info("resetInvestment", null, "Error 
while trying send Investment Reset request to JD. Returning to the caller (no 
retries) WI:"+transaction.getMetaCas().getSystemKey());
+                               logger.info("resetInvestment", null, "Error 
while trying send Investment Reset request to JD. Returning to the caller (no 
retries) WI:"+transaction.getMetaTask().getSystemKey());
                                logger.info("resetInvestment", null, e);
                                throw new RuntimeException("Unable to deliver 
Investment Reset request to JD due to "+e.getCause().getMessage());
                        }

Modified: 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/JobDriverReport.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/JobDriverReport.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/JobDriverReport.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/JobDriverReport.java
 Tue Jul 17 13:49:47 2018
@@ -35,7 +35,7 @@ import org.apache.uima.ducc.container.jd
 import 
org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo.CompletionType;
 import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo;
 import org.apache.uima.ducc.container.jd.mh.iface.IWorkItemInfo;
-import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.JdState;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.JdState;
 import org.apache.uima.ducc.transport.event.common.DuccPerWorkItemStatistics;
 import org.apache.uima.ducc.transport.event.common.DuccProcessWorkItems;
 import 
org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;

Modified: 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java?rev=1836115&r1=1836114&r2=1836115&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java
 Tue Jul 17 13:49:47 2018
@@ -19,6 +19,7 @@
 package org.apache.uima.ducc.user.common.main;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -29,7 +30,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.uima.ducc.user.common.investment.Investment;
-import org.apache.uima.ducc.user.jp.iface.IProcessContainer;
 
 /**
  * Main program that is used to launch Job Process(JP).
@@ -38,10 +38,9 @@ import org.apache.uima.ducc.user.jp.ifac
 public class DuccJobService {
        boolean DEBUG = false;
        private Investment investment = null;
-       private Method stopMethod;
-       private Object duccContainerInstance;
        private Logger logger = 
Logger.getLogger(DuccJobService.class.getName());
-       
+       private IServiceWrapper service = null;
+        
        public static URLClassLoader create(String classPath)
                        throws MalformedURLException {
                return create(classPath.split(":"));
@@ -88,94 +87,88 @@ public class DuccJobService {
                        }
                }
        }
-
+       private void addUrlsToSystemLoader(URL[] urls) throws IOException {
+               URLClassLoader systemClassLoader = (URLClassLoader) ClassLoader
+                               .getSystemClassLoader();
+               try {
+                       Method method = 
URLClassLoader.class.getDeclaredMethod("addURL",
+                                       new Class[] { URL.class });
+                       method.setAccessible(true); // is normally "protected"
+                       for (URL url : urls) {
+                               method.invoke(systemClassLoader, new Object[] { 
url });
+                       }
+               } catch (Throwable t) {
+                       t.printStackTrace();
+                       throw new IOException(
+                                       "Error, could not add URL to system 
classloader");
+               }
+       }
+       private URL[] getUrlsFromDuccClasspath(String[] duccClassPath)
+                       throws MalformedURLException {
+               ArrayList<URL> urlList = new 
ArrayList<URL>(duccClassPath.length);
+               for (String element : duccClassPath) {
+                       if (element.endsWith("*")) {
+                               File dir = new File(element.substring(0, 
element.length() - 1));
+                               File[] files = dir.listFiles(); // Will be null 
if missing or
+                                                                               
                // not a dir
+                               if (files != null) {
+                                       for (File f : files) {
+                                               if 
(f.getName().endsWith(".jar")) {
+                                                       
urlList.add(f.toURI().toURL());
+                                               }
+                                       }
+                               }
+                       } else {
+                               File f = new File(element);
+                               if (f.exists()) {
+                                       urlList.add(f.toURI().toURL());
+                               }
+                       }
+               }
+               URL[] urls = new URL[urlList.size()];
+               return urlList.toArray(urls);
+       }
        public void start(String[] args) throws Exception {
                try {
                investment = new Investment();
-            
-               // cache current context classloader
-                       ClassLoader sysCL = 
Thread.currentThread().getContextClassLoader();
 
                        // Fetch a classpath for the fenced Ducc container
                        String duccContainerClasspath = 
System.getProperty("ducc.deploy.DuccClasspath");
-                       URLClassLoader ucl = create(duccContainerClasspath);
+                       if ( duccContainerClasspath != null ) {
+                               URL[] urls = 
getUrlsFromDuccClasspath(duccContainerClasspath.split(":"));
+                               addUrlsToSystemLoader(urls);
+                       }
                        if (System.getProperty("ducc.debug") != null) {
                                DEBUG = true;
                        }
                        if (DEBUG) {
-                               dump(ucl, 4);
+                               dump((URLClassLoader) ClassLoader
+                                               .getSystemClassLoader(), 4);
                        }
-
-                       // Load the DuccService class and find its methods of 
interest
-                       Class<?> duccServiceClass = 
ucl.loadClass("org.apache.uima.ducc.common.main.DuccService");
-                       Method bootMethod = duccServiceClass.getMethod("boot", 
String[].class);
-                       Method setProcessorMethod = 
duccServiceClass.getMethod("setProcessor", Object.class, String[].class);
-                       Method registerInvestmentInstanceMethod = 
duccServiceClass.getMethod("registerInvestmentInstance", Object.class);
-                       Method startMethod = 
duccServiceClass.getMethod("start");
-                       stopMethod = duccServiceClass.getMethod("stop");
-
-                       // Establish user's logger early to prevent the DUCC 
code from accidentally doing so
-                       logger.log(Level.INFO, ">>>>>>>>> Booting Ducc 
Container");
-
-                       HashMap<String, String> savedPropsMap = 
hideLoggingProperties();  // Ensure DUCC doesn't try to use the user's logging 
setup
-                       
-                       // Construct & initialize Ducc fenced container. 
-                       // It calls component's Configuration class
-                       Thread.currentThread().setContextClassLoader(ucl);
-                       duccContainerInstance = duccServiceClass.newInstance();
-                       bootMethod.invoke(duccContainerInstance, (Object) args);
-
-                       logger.log(Level.INFO, "<<<<<<<< Ducc Container 
booted");
-                       restoreLoggingProperties(savedPropsMap);  // May not be 
necessary as user's logger has been established
-                       
-                       // below property is set by component's Configuration 
class. It can also
-                       // be provided on the command line in case a custom 
processor is needed.
-                       String processorClass = 
System.getProperty("ducc.deploy.JpProcessorClass");
-
-                       // Instantiate process container where the actual 
analysis will be done.
-                       // Currently there are three containers:
-                       // 1 - UimaProcessContainer - used for pieces parts 
(UIMA only)
-                       // 2 - UimaASProcessContainer - used for DD jobs
-                       // 3 - UimaASServiceContainer - used for UIMA-AS based 
services
-                       //
-                       // NOTE: the container class is loaded by the main 
System classloader
-                       //       and requires uima-ducc-user jar to be in the 
System classpath.
-                       // 
--------------------------------------------------------------------
-                       // load the process container class using the initial 
system class loader
-                       Class<?> processorClz = sysCL.loadClass(processorClass);
-                       IProcessContainer pc = (IProcessContainer) 
processorClz.newInstance();
-
-                       logger.log(Level.INFO, ">>>>>>>>> Running Ducc 
Container");
- 
-                       // Call DuccService.setProcessor() to hand-off instance 
of the 
-                       // process container to the component along with this 
process args
-                       setProcessorMethod.invoke(duccContainerInstance, pc, 
args);
-
-                       // Hand-off investment object
-                       
registerInvestmentInstanceMethod.invoke(duccContainerInstance, investment);
-                       
-               // Call DuccService.start() to initialize the process and begin 
processing
-                       startMethod.invoke(duccContainerInstance);
+               service = ServiceFactory.newService();
+               service.initialize(args);
+               service.start();
+               
+//                     HashMap<String, String> savedPropsMap = 
hideLoggingProperties();  // Ensure DUCC doesn't try to use the user's logging 
setup
+//                     restoreLoggingProperties(savedPropsMap);  // May not be 
necessary as user's logger has been established
 
                        logger.log(Level.INFO, "<<<<<<<< Ducc Container ended");
                        
                } catch( Throwable t) {
-                       t.printStackTrace();
-                       System.out.println("Exiting Process Due to 
Unrecoverable Error");
+                       logger.log(Level.SEVERE, "<<<<<<<< Exiting Process Due 
to Unrecoverable Error:", t);
                        Runtime.getRuntime().halt(99);   // User Error
                }
 
        }
        
        /*
-        * Terminate the service connection
+        * Terminate service connection
         */
        public void stop() {
                try {
-                       stopMethod.invoke(duccContainerInstance);
+                       service.stop();
                } catch( Throwable t) {
                        logger.log(Level.SEVERE, "Stop failed");
-                       t.printStackTrace();
                }
        }
 
@@ -211,5 +204,7 @@ public class DuccJobService {
                        e.printStackTrace();
                }
        }
+       
+       
 
 }

Added: 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/IServiceWrapper.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/IServiceWrapper.java?rev=1836115&view=auto
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/IServiceWrapper.java
 (added)
+++ 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/IServiceWrapper.java
 Tue Jul 17 13:49:47 2018
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.ducc.user.common.main;
+
+public interface IServiceWrapper {
+       public void initialize(String[] args) throws Exception;
+       public void start() throws Exception;
+       public void stop() throws Exception;
+}

Added: 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/PullServiceWrapper.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/PullServiceWrapper.java?rev=1836115&view=auto
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/PullServiceWrapper.java
 (added)
+++ 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/PullServiceWrapper.java
 Tue Jul 17 13:49:47 2018
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.user.common.main;
+
+import java.lang.reflect.Method;
+
+/*
+ * Implements dynamic loading of a service class, initializes its instance
+ * and starts it. 
+ */
+
+public class PullServiceWrapper implements IServiceWrapper{
+
+       private Object instance = null;
+       private Class<?> duccServiceClass =  null;
+       
+       public void initialize(String[] args) throws Exception{
+               duccServiceClass = 
+                               
ClassLoader.getSystemClassLoader().loadClass("org.apache.uima.ducc.ps.service.main.ServiceWrapper");
+               instance = duccServiceClass.newInstance();
+               Method initMethod = duccServiceClass.getMethod("initialize", 
String[].class);
+               initMethod.invoke(instance, (Object) args);
+       }
+       public void start() throws Exception {
+               Method startMethod = duccServiceClass.getMethod("start");
+               startMethod.invoke(instance);
+       }
+
+       public void stop() throws Exception {
+               Method stopMethod = duccServiceClass.getMethod("stop");
+               stopMethod.invoke(instance);
+
+       }
+}

Added: 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/ServiceFactory.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/ServiceFactory.java?rev=1836115&view=auto
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/ServiceFactory.java
 (added)
+++ 
uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/ServiceFactory.java
 Tue Jul 17 13:49:47 2018
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.ducc.user.common.main;
+
+public class ServiceFactory {
+       
+       private ServiceFactory() {}
+
+       public static IServiceWrapper newService() throws 
ClassNotFoundException, InstantiationException, IllegalAccessException { 
+               Class<?> serviceClass = null;
+               String serviceClassName = null;
+               // use custom service class or default. Custom class must 
implement ServiceWrapper
+               if ( ( serviceClassName = 
System.getProperty("ducc.service.class")) != null ) {
+                       serviceClass = 
Thread.currentThread().getContextClassLoader().loadClass(serviceClassName);
+                       return (IServiceWrapper)serviceClass.newInstance();
+               } else {
+                       // use default
+                       return new PullServiceWrapper();
+               }
+       }
+}


Reply via email to