Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java?rev=1836115&view=auto ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java (added) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java Tue Jul 17 13:49:47 2018 @@ -0,0 +1,659 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ +package org.apache.uima.ducc.ps.service.processor.uima; + +import java.io.File; +import java.io.FileInputStream; +import java.lang.reflect.Method; +import java.net.BindException; +import java.net.InetAddress; +import java.net.SocketException; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; + +import org.apache.uima.UIMAFramework; +import org.apache.uima.cas.CAS; +import org.apache.uima.ducc.ps.service.IScaleable; +import org.apache.uima.ducc.ps.service.IServiceState; +import org.apache.uima.ducc.ps.service.ServiceConfiguration; +import org.apache.uima.ducc.ps.service.dgen.DeployableGenerator; +import org.apache.uima.ducc.ps.service.dgen.DuccUimaReferenceByName; +import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action; +import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException; +import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor; +import org.apache.uima.ducc.ps.service.monitor.builtin.RemoteStateObserver; +import org.apache.uima.ducc.ps.service.processor.IProcessResult; +import org.apache.uima.ducc.ps.service.processor.IServiceProcessor; +import org.apache.uima.ducc.ps.service.processor.IServiceResultSerializer; +import org.apache.uima.ducc.ps.service.processor.uima.utils.PerformanceMetrics; +import org.apache.uima.ducc.ps.service.processor.uima.utils.UimaResultDefaultSerializer; +import org.apache.uima.ducc.ps.service.utils.UimaSerializer; +import org.apache.uima.ducc.ps.service.utils.Utils; +import org.apache.uima.resource.metadata.impl.TypeSystemDescription_impl; +import org.apache.uima.util.CasCreationUtils; +import org.apache.uima.util.Level; +import org.apache.uima.util.Logger; + +public class UimaAsServiceProcessor extends AbstractServiceProcessor implements IServiceProcessor, IScaleable { + private static final Class<?> CLASS_NAME = UimaAsServiceProcessor.class; + + Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class); + // Map to store DuccUimaSerializer instances. Each has affinity to a thread +// private Map<Long, UimaSerializer> serializerMap = new HashMap<>(); + private static Object brokerInstance = null; + private UimaAsClientWrapper uimaASClient = null; + private String saxonURL = null; + private String xslTransform = null; + protected Object initializeMonitor = new Object(); + private static volatile boolean brokerRunning = false; + private static final char FS = System.getProperty("file.separator").charAt(0); + + private static final CountDownLatch brokerLatch = new CountDownLatch(1); + public static final String brokerPropertyName = "ducc.broker.name"; + public static final String queuePropertyName = "ducc.queue.name"; + public static final String duccNodeName = "DUCC_NODENAME"; + private IServiceResultSerializer resultSerializer; + private static Class<?> classToLaunch = null; + private String aeName = ""; + private int scaleout = 1; + private static Object platformMBeanServer; + private ServiceConfiguration serviceConfiguration; + private IServiceMonitor monitor; + public volatile boolean initialized = false; + private String[] deploymentDescriptors = null; + private String[] ids = null; + private String duccHome = null; + boolean enablePerformanceBreakdownReporting = false; + private AtomicInteger numberOfInitializedThreads = new AtomicInteger(); + + static { + // try to get platform MBean Server (Java 1.5 only) + try { + Class<?> managementFactory = Class.forName("java.lang.management.ManagementFactory"); + Method getPlatformMBeanServer = managementFactory.getMethod("getPlatformMBeanServer", new Class[0]); + platformMBeanServer = getPlatformMBeanServer.invoke(null, (Object[]) null); + } catch (Exception e) { + platformMBeanServer = null; + } + } + private String[] args; + + public UimaAsServiceProcessor(String[] args, ServiceConfiguration serviceConfiguration) { + this.args = args; + this.serviceConfiguration = serviceConfiguration; + // start a thread which will collect AE initialization state + launchStateInitializationCollector(); + + } + + @Override + public void setScaleout(int howManyThreads) { + scaleout = howManyThreads; + } + + @Override + public int getScaleout() { + return scaleout; + } + + private int generateDescriptorsAndGetScaleout(String[] args) throws Exception { + deploymentDescriptors = getDescriptors(args); + ids = new String[deploymentDescriptors.length]; + return scaleout; + } + + private void enableMetricsIfNewUimaAs() throws Exception{ + // inner class hiding java reflection code to access uima-as class + UimaAsVersionWrapper uimaVersion = new UimaAsVersionWrapper(); + + int majorVersion = uimaVersion.getMajor(); + int minorVersion = uimaVersion.getMinor(); + int buildRevision = uimaVersion.getRevision(); + + // enable performance breakdown reporting when support is added in the next UIMA + // AS release after 2.6.0 + // (assumes the fix will be after the current 2.6.1-SNAPSHOT level) + if (majorVersion > 2 || (majorVersion == 2 && (minorVersion > 6 || (minorVersion == 6 && buildRevision > 1)))) { + enablePerformanceBreakdownReporting = true; + } + } + private void launchStateInitializationCollector() { + monitor = + new RemoteStateObserver(serviceConfiguration, logger); + } + @Override + public void initialize() throws ServiceInitializationException { + try { + duccHome = System.getProperty("DUCC_HOME"); + String pid = Utils.getPID("Queue"); + String endpointName; // This is for the local within-process UIMA-AS client/server + if (System.getenv(duccNodeName) != null) { + endpointName = System.getenv(duccNodeName) + pid; + } else { + endpointName = InetAddress.getLocalHost().getCanonicalHostName() + pid; + } + // Needed to resolve ${queue.name} placeholder in DD generated by DUCC + System.setProperty(queuePropertyName, endpointName); + + // generate Spring context file once + synchronized (UimaAsServiceProcessor.class) { + // every process thread has its own uima deserializer + serializerMap.put(Thread.currentThread().getId(), new UimaSerializer()); + if (!initialized) { + monitor.onStateChange(IServiceState.State.Initializing.toString(), new Properties()); + + Properties duccProperties = loadDuccProperties(); + String saxonPath = + resolvePlaceholders(duccProperties.getProperty("ducc.uima-as.saxon.jar.path"), System.getProperties() ); + String xsltPath = + resolvePlaceholders(duccProperties.getProperty("ducc.uima-as.dd2spring.xsl.path"), System.getProperties() ); + List<String> argList = new ArrayList<>(); + argList.add("-d"); + argList.add(args[0]); + argList.add("-saxonURL"); + argList.add(saxonPath); + argList.add("-xslt"); + argList.add(xsltPath); + + if ( System.getProperty("ducc.deploy.JpThreadCount") != null ) { + argList.add("-t"); + argList.add(System.getProperty("ducc.deploy.JpThreadCount")); + } + + enableMetricsIfNewUimaAs(); + resultSerializer = new UimaResultDefaultSerializer(); + uimaASClient = new UimaAsClientWrapper(); + scaleout = generateDescriptorsAndGetScaleout(argList.toArray(new String[argList.size()])); // Also converts the DD if necessary + if ( scaleout == 0 ) { + scaleout = 1; + } + initialized = true; + } + doDeploy(); + } + if ( numberOfInitializedThreads.incrementAndGet() == scaleout ) { + super.delay(logger, 30000); + monitor.onStateChange(IServiceState.State.Running.toString(), new Properties()); + } + + + } catch( Exception e) { + logger.log(Level.WARNING, null, e); + monitor.onStateChange(IServiceState.State.FailedInitialization.toString(), new Properties()); + throw new ServiceInitializationException("",e); + } + + } + public String resolvePlaceholders(String contents, Properties props ) + { + // Placeholders syntax ${<placeholder>} + Pattern placeHolderPattern = Pattern.compile("\\$\\{(.*?)\\}"); + + java.util.regex.Matcher matcher = + placeHolderPattern.matcher(contents); + + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + // extract placeholder + final String key = matcher.group(1); + // Find value for extracted placeholder. + String placeholderValue = props.getProperty(key); + if (placeholderValue == null) { + placeholderValue = System.getProperty(key); + if (placeholderValue == null) { + throw new IllegalArgumentException("Missing value for placeholder: " + key); + } + } + matcher.appendReplacement(sb, placeholderValue); + } + matcher.appendTail(sb); + return sb.toString(); + } + @Override + public IProcessResult process(String serializedTask) { + CAS cas = null; + IProcessResult result; + try { + cas = uimaASClient.getCAS(); + // DUCC JP services are given a serialized CAS ... others just the doc-text for a CAS + if (serviceConfiguration.getJpType() != null) { + // Use thread dedicated UimaSerializer to de-serialize the CAS + getUimaSerializer().deserializeCasFromXmi(serializedTask, cas); + } else { + cas.setDocumentText(serializedTask); + cas.setDocumentLanguage("en"); + } + + List<PerformanceMetrics> casMetrics = new ArrayList<>(); + + if (enablePerformanceBreakdownReporting) { + List<?> perfMetrics = new ArrayList<>(); + + try { + uimaASClient.sendAndReceive(cas, perfMetrics); + } catch (Exception t) { + logger.log(Level.WARNING, "", t); + result = new UimaProcessResult(t, Action.TERMINATE); + return result; + } + + for (Object metrics : perfMetrics) { + Method nameMethod = metrics.getClass().getDeclaredMethod("getName"); + String name = (String) nameMethod.invoke(metrics); + Method uniqueNameMethod = metrics.getClass().getDeclaredMethod("getUniqueName"); + String uniqueName = (String) uniqueNameMethod.invoke(metrics); + Method analysisTimeMethod = metrics.getClass().getDeclaredMethod("getAnalysisTime"); + long analysisTime = (long) analysisTimeMethod.invoke(metrics); + + boolean aggregate = uniqueName.startsWith("/" + name); + int pos = uniqueName.indexOf("/", 1); + if (pos > -1 && scaleout > 1 && name != null && aggregate) { + String st = uniqueName.substring(pos); + uniqueName = "/" + name + st; + } + PerformanceMetrics pm = new PerformanceMetrics(name, uniqueName, analysisTime); + casMetrics.add(pm); + } + } else { + // delegate processing to the UIMA-AS service and wait for a reply + try { + uimaASClient.sendAndReceive(cas); + } catch (Exception t) { + logger.log(Level.WARNING, "", t); + result = new UimaProcessResult(t, Action.TERMINATE); + return result; + } + PerformanceMetrics pm = new PerformanceMetrics( + "Performance Metrics Not Supported For DD Jobs and UIMA-AS <= v2.6.0", + "Performance Metrics Not Supported For DD Jobs and UIMA-AS <= v2.6.0 ", 0); + casMetrics.add(pm); + + } + return new UimaProcessResult(resultSerializer.serialize(casMetrics)); + + } catch (Exception t) { + logger.log(Level.WARNING, "", t); + result = new UimaProcessResult(t, Action.TERMINATE); + return result; + } finally { + if (cas != null) { + cas.release(); + } + } + } + private void doDeploy() throws Exception { + // deploy singleUIMA-AS Version instance of embedded broker + try { + // below code runs once to create broker, uima-as client and + // uima-as service + if (brokerInstance == null) { + deployBroker(duccHome); + // Broker is running + brokerRunning = true; + + int i = 0; + // Deploy UIMA-AS services + for (String dd : deploymentDescriptors) { + // Deploy UIMA-AS service. Keep the deployment id so + // that we can undeploy uima-as service on stop. + ids[i] = uimaASClient.deployService(dd); + } + // send GetMeta to UIMA-AS service and wait for a reply + uimaASClient.initialize(); + } + + } catch (Throwable e) { + logger.log(Level.WARNING, "UimaAsServiceProcesser", e); + throw new RuntimeException(e); + + } + } + private Properties loadDuccProperties() throws Exception { + Properties duccProperties = new Properties(); + duccProperties.load(new FileInputStream(System.getProperty("ducc.deploy.configuration"))); + return duccProperties; + } + /** + * Extract descriptors from arg list. Also extract xsl processor and saxon url. + * Parse the DD to fetch scaleout property & convert the DD if necessary. + * + * @param args + * - java argument list + * @return - an array of DDs + * + * @throws Exception + */ + private String[] getDescriptors(String[] args) throws Exception { + int nbrOfArgs = args.length; + String[] deploymentDescriptors = Utils.getMultipleArg("-d", args); + if (deploymentDescriptors.length == 0) { + // allow multiple args for one key + deploymentDescriptors = Utils.getMultipleArg2("-dd", args); + } + for( String arg : args) { + logger.log(Level.INFO,"+++++++++++++++ arg:"+arg); + } + saxonURL = Utils.getArg("-saxonURL", args); + xslTransform = Utils.getArg("-xslt", args); + String threadCount = Utils.getArg("-t", args); // Will be null if ducc.deploy.JpThreadCount is undefined + if ( threadCount.isEmpty()) { + threadCount = "1"; // default + } + if (nbrOfArgs < 1 || (deploymentDescriptors.length == 0 || saxonURL.equals("") || xslTransform.equals(""))) { + printUsageMessage(); + return null; // Done here + } + deploymentDescriptors[0] = parseDD(deploymentDescriptors[0], threadCount); + return deploymentDescriptors; + } + + /** + * Parses given Deployment Descriptor to extract scaleout and to check if it has + * placeholders for the queue & broker. Generates a converted one if necessary, + * e.g. for "pull services. The scaleout is used for "pull" services and should + * become the default for DD jobs. + * + * @param ddPath + * - path to the DD + * @param threadCount + * - pipeline scaleout value null if undefined) + * @return original or converted DD + * @throws Exception + */ + public String parseDD(String ddPath, String threadCount) throws Exception { + // For "custom" pull-services must convert the DD so it can be deployed locally + String logfile=System.getenv("DUCC_PROCESS_LOG_PREFIX"); + // For JPs this should return the same file, already converted by the JD + String logDir = new File(logfile).getParent(); + + DeployableGenerator deployableGenerator = new DeployableGenerator(logDir); + DuccUimaReferenceByName configuration = new DuccUimaReferenceByName(0, ddPath); + String ddNew = deployableGenerator.generateDd(configuration, System.getenv("DUCC_JOBID"), true); + + if (ddNew != ddPath) { + logger.log(Level.INFO, "Generated " + ddNew + " from " + ddPath); + } + + int ddScaleout = deployableGenerator.getScaleout(); + if (threadCount == null) { + scaleout = ddScaleout; + logger.log(Level.INFO, "DD specifies a scaleout of " + scaleout); + } else { + scaleout = Integer.parseInt(threadCount); + if (ddScaleout != scaleout) { + logger.log(Level.WARNING, + "Scaleout specified as " + scaleout + " but the DD is configured for " + ddScaleout); + } + } + + return ddNew; + } + + @Override + public void stop() { + synchronized (UimaAsServiceProcessor.class) { + if (brokerRunning) { + logger.log(Level.INFO, "Stopping UIMA_AS Client"); + try { + // Prevent UIMA-AS from exiting + System.setProperty("dontKill", "true"); + uimaASClient.stop(); + + Method brokerStopMethod = classToLaunch.getMethod("stop"); + brokerStopMethod.invoke(brokerInstance); + + Method waitMethod = classToLaunch.getMethod("waitUntilStopped"); + waitMethod.invoke(brokerInstance); + brokerRunning = false; + + } catch (Exception e) { + logger.log(Level.WARNING, "stop", e); + } + + } + } + + } + + private void deployBroker(String duccHome) throws Exception { + // Save current context class loader. When done loading the broker jars + // this class loader will be restored + ClassLoader currentCL = Thread.currentThread().getContextClassLoader(); + HashMap<String, String> savedPropsMap = null; + + try { + // setup a classpath for Ducc broker + String[] brokerClasspath = new String[] { + duccHome + File.separator + "apache-uima" + File.separator + "apache-activemq" + File.separator + + "lib" + File.separator + "*", + duccHome + File.separator + "apache-uima" + File.separator + "apache-activemq" + File.separator + + "lib" + File.separator + "optional" + File.separator + "*" }; + + // isolate broker in its own Class loader + URLClassLoader ucl = Utils.create(brokerClasspath); + Thread.currentThread().setContextClassLoader(ucl); + savedPropsMap = Utils.hideLoggingProperties(); // Ensure DUCC doesn't try to use the user's logging setup + + classToLaunch = ucl.loadClass("org.apache.activemq.broker.BrokerService"); + if (System.getProperty("ducc.debug") != null) { + Utils.dump(ucl, 4); + } + brokerInstance = classToLaunch.newInstance(); + + Method setDedicatedTaskRunnerMethod = classToLaunch.getMethod("setDedicatedTaskRunner", boolean.class); + setDedicatedTaskRunnerMethod.invoke(brokerInstance, false); + + Method setPersistentMethod = classToLaunch.getMethod("setPersistent", boolean.class); + setPersistentMethod.invoke(brokerInstance, false); + + int port = 61626; // try to start the colocated broker with this port first + String brokerURL = "tcp://localhost:"; + // loop until a valid port is found for the broker + while (true) { + try { + Method addConnectorMethod = classToLaunch.getMethod("addConnector", String.class); + addConnectorMethod.invoke(brokerInstance, brokerURL + port); + + Method startMethod = classToLaunch.getMethod("start"); + startMethod.invoke(brokerInstance); + + Method waitUntilStartedMethod = classToLaunch.getMethod("waitUntilStarted"); + waitUntilStartedMethod.invoke(brokerInstance); + System.setProperty("DefaultBrokerURL", brokerURL + port); + System.setProperty("BrokerURI", brokerURL + port); + // Needed to resolve ${broker.name} placeholder in DD generated by DUCC + System.setProperty(brokerPropertyName, brokerURL + port); + + break; // got a valid port for the broker + } catch (Exception e) { + if (isBindException(e)) { + port++; + } else { + throw new RuntimeException(e); + } + } + } + + } catch (Exception e) { + throw e; + } finally { + // restore context class loader + Thread.currentThread().setContextClassLoader(currentCL); + brokerLatch.countDown(); + Utils.restoreLoggingProperties(savedPropsMap); // May not be necessary as user's logger has been established + } + + } + + private boolean isBindException(Throwable e) { + if (e == null) { + return false; + } + + if (e instanceof BindException) { + return true; + } else if (e instanceof SocketException && "Address already in use".equals(e.getMessage())) { + return true; + } else if (e.getCause() != null) { + return isBindException(e.getCause()); + } else { + return false; + } + } + + private static void printUsageMessage() { + System.out.println(" Arguments to the program are as follows : \n" + + "-d path-to-UIMA-Deployment-Descriptor [-d path-to-UIMA-Deployment-Descriptor ...] \n" + + "-saxon path-to-saxon.jar \n" + "-q top level service queue name \n" + + "-xslt path-to-dd2spring-xslt\n" + " or\n" + + "path to Spring XML Configuration File which is the output of running dd2spring\n"); + } + public static void main(String[] args) { + try { + UimaAsServiceProcessor processor = + new UimaAsServiceProcessor(args, null); + processor.initialize(); + CAS cas = CasCreationUtils.createCas(new TypeSystemDescription_impl(), null, null); + cas.setDocumentLanguage("en"); + cas.setDocumentText("Test"); + UimaSerializer serializer = + new UimaSerializer(); + String serializedCas = serializer.serializeCasToXmi(cas); + + IProcessResult result = + processor.process(serializedCas); + System.out.println("Client Received Result - Success:"+(result.getResult()!=null)); + processor.stop(); + } catch( Exception e) { + e.printStackTrace(); + } + + } + private class UimaAsClientWrapper { + private Object uimaASClient; + private Class<?> clientClz; + + public UimaAsClientWrapper() throws Exception { + clientClz = Class + .forName("org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl"); + uimaASClient = clientClz.newInstance(); + + } + + public String deployService(String aDeploymentDescriptorPath) throws Exception { + + Map<String, Object> appCtx = new HashMap<>(); + + Class<?> clz = Class.forName("org.apache.uima.aae.client.UimaAsynchronousEngine"); + + appCtx.put((String) clz.getField("DD2SpringXsltFilePath").get(uimaASClient), xslTransform.replace('/', FS)); + appCtx.put((String) clz.getField("SaxonClasspath").get(uimaASClient), saxonURL.replace('/', FS)); + appCtx.put((String) clz.getField("CasPoolSize").get(uimaASClient), scaleout); + + String containerId = null; + // use UIMA-AS client to deploy the service using provided + // Deployment Descriptor + ClassLoader duccCl = Thread.currentThread().getContextClassLoader(); + ClassLoader cl = this.getClass().getClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + Method deployMethod = uimaASClient.getClass().getDeclaredMethod("deploy", String.class, Map.class); + containerId = (String) deployMethod.invoke(uimaASClient, + new Object[] { aDeploymentDescriptorPath, appCtx }); + Thread.currentThread().setContextClassLoader(duccCl); + return containerId; + } + + private void initialize() throws Exception { + + String endpoint = System.getProperty(queuePropertyName); + String brokerURL = System.getProperty(brokerPropertyName); + Map<String, Object> appCtx = new HashMap<>(); + Class<?> clz = Class.forName("org.apache.uima.aae.client.UimaAsynchronousEngine"); + + appCtx.put((String) clz.getField("ServerUri").get(uimaASClient), brokerURL); + appCtx.put((String) clz.getField("ENDPOINT").get(uimaASClient), endpoint); + appCtx.put((String) clz.getField("CasPoolSize").get(uimaASClient), scaleout); + appCtx.put((String) clz.getField("Timeout").get(uimaASClient), 0); + appCtx.put((String) clz.getField("GetMetaTimeout").get(uimaASClient), 0); + appCtx.put((String) clz.getField("CpcTimeout").get(uimaASClient), 1100); + Method initMethod = uimaASClient.getClass().getMethod("initialize", Map.class); + initMethod.invoke(uimaASClient, new Object[] { appCtx }); + + // blocks until the client initializes + Method getMetaMethod = uimaASClient.getClass().getMethod("getMetaData"); + Object meta = getMetaMethod.invoke(uimaASClient); + + Method nameMethod = meta.getClass().getMethod("getName"); + aeName = (String) nameMethod.invoke(meta); + } + public CAS getCAS() throws Exception { + Method getCasMethod = uimaASClient.getClass().getMethod("getCAS"); + return (CAS) getCasMethod.invoke(uimaASClient); + } + public void sendAndReceive(CAS cas, List<?> perfMetrics) throws Exception { + Method sendMethod = uimaASClient.getClass().getMethod("sendAndReceiveCAS", CAS.class, + List.class); + sendMethod.invoke(uimaASClient, new Object[] { cas, perfMetrics }); + } + public void sendAndReceive(CAS cas) throws Exception { + Method sendMethod = uimaASClient.getClass().getDeclaredMethod("sendAndReceiveCAS", CAS.class); + sendMethod.invoke(uimaASClient, new Object[] { cas }); + } + public void stop() throws Exception { + Method clientStopMethod = uimaASClient.getClass().getDeclaredMethod("stop"); + clientStopMethod.invoke(uimaASClient); + } + + + } + private class UimaAsVersionWrapper { + Class<?> clz = null; + Method m = null; + + + public UimaAsVersionWrapper() throws Exception { + clz = Class.forName("org.apache.uima.aae.UimaAsVersion"); + } + + public String getFullVersion() throws Exception { + m = clz.getDeclaredMethod("getFullVersionString"); + return (String)m.invoke(null); + } + public int getMajor() throws Exception { + Method majorVersionMethod = clz.getDeclaredMethod("getMajorVersion"); + return (short) majorVersionMethod.invoke(null); + } + public int getMinor() throws Exception { + Method minorVersionMethod = clz.getDeclaredMethod("getMinorVersion"); + return (short) minorVersionMethod.invoke(null); + } + public int getRevision() throws Exception { + Method buildRevisionMethod = clz.getDeclaredMethod("getBuildRevision"); + return (short) buildRevisionMethod.invoke(null); + } + + } +}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java Tue Jul 17 13:49:47 2018 @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -56,18 +56,17 @@ import org.apache.uima.util.Level; import org.apache.uima.util.Logger; import org.apache.uima.util.XMLInputSource; -public class UimaServiceProcessor implements IServiceProcessor { +public class UimaServiceProcessor extends AbstractServiceProcessor implements IServiceProcessor { public static final String IMPORT_BY_NAME_PREFIX = "*importByName:"; Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class); // Map to store DuccUimaSerializer instances. Each has affinity to a thread - private Map<Long, UimaSerializer> serializerMap; private IServiceResultSerializer resultSerializer; // stores AE instance pinned to a thread - private ThreadLocal<AnalysisEngine> threadLocal = + private ThreadLocal<AnalysisEngine> threadLocal = new ThreadLocal<> (); private ReentrantLock initStateLock = new ReentrantLock(); private boolean sendInitializingState = true; - private ResourceManager rm = + private ResourceManager rm = UIMAFramework.newDefaultResourceManager();; private CasPool casPool = null; private int scaleout=1; @@ -78,7 +77,7 @@ public class UimaServiceProcessor implem private ServiceConfiguration serviceConfiguration; private IServiceMonitor monitor; private AtomicInteger numberOfInitializedThreads = new AtomicInteger(); - + static { // try to get platform MBean Server (Java 1.5 only) try { @@ -88,8 +87,8 @@ public class UimaServiceProcessor implem } catch (Exception e) { platformMBeanServer = null; } - } - + } + public UimaServiceProcessor(String analysisEngineDescriptor) { this(analysisEngineDescriptor, new UimaResultDefaultSerializer(), new ServiceConfiguration()); } @@ -106,7 +105,7 @@ public class UimaServiceProcessor implem serializerMap = new HashMap<>(); } } - + private void launchStateInitializationCollector() { monitor = new RemoteStateObserver(serviceConfiguration, logger); @@ -120,9 +119,10 @@ public class UimaServiceProcessor implem @Override public void initialize() { + if ( logger.isLoggable(Level.FINE)) { logger.log(Level.FINE, "Process Thread:"+ Thread.currentThread().getName()+" Initializing AE"); - + } try { // multiple threads may call this method. Send initializing state once @@ -132,7 +132,7 @@ public class UimaServiceProcessor implem monitor.onStateChange(IServiceState.State.Initializing.toString(), new Properties()); } } catch( Exception e) { - + } finally { initStateLock.unlock(); } @@ -141,9 +141,9 @@ public class UimaServiceProcessor implem HashMap<String,Object> paramsMap = new HashMap<>(); paramsMap.put(Resource.PARAM_RESOURCE_MANAGER, rm); paramsMap.put(AnalysisEngine.PARAM_MBEAN_SERVER, platformMBeanServer); - + try { - + XMLInputSource is = UimaUtils.getXMLInputSource(analysisEngineDescriptor); String aed = is.getURL().toString(); @@ -160,22 +160,23 @@ public class UimaServiceProcessor implem initializeCasPool(ae.getAnalysisEngineMetaData()); } } - // every process thread has its own uima deserializer if (serviceConfiguration.getJpType() != null) { serializerMap.put(Thread.currentThread().getId(), new UimaSerializer()); } } catch (Exception e) { + logger.log(Level.WARNING, null, e); monitor.onStateChange(IServiceState.State.FailedInitialization.toString(), new Properties()); throw new RuntimeException(e); - } + } if ( logger.isLoggable(Level.INFO)) { logger.log(Level.INFO, "Process Thread:"+ Thread.currentThread().getName()+" Done Initializing AE"); - + } if ( numberOfInitializedThreads.incrementAndGet() == scaleout ) { + super.delay(logger, 30000); monitor.onStateChange(IServiceState.State.Running.toString(), new Properties()); } } @@ -188,17 +189,17 @@ public class UimaServiceProcessor implem casPool = new CasPool(scaleout, analysisEngineMetadata, rm); } - private UimaSerializer getUimaSerializer() { - return serializerMap.get(Thread.currentThread().getId()); - } - +// private UimaSerializer getUimaSerializer() { +// +// return serializerMap.get(Thread.currentThread().getId()); +// } @Override public IProcessResult process(String serializedTask) { AnalysisEngine ae = null; CAS cas = casPool.getCas(); IProcessResult result; - + try { // DUCC JP services are given a serialized CAS ... others just the doc-text for a CAS if (serviceConfiguration.getJpType() != null) { @@ -207,36 +208,27 @@ public class UimaServiceProcessor implem cas.setDocumentText(serializedTask); cas.setDocumentLanguage("en"); } - // check out AE instance pinned to this thread ae = threadLocal.get(); // get AE metrics before calling process(). Needed for // computing a delta - List<PerformanceMetrics> beforeAnalysis = + List<PerformanceMetrics> beforeAnalysis = UimaMetricsGenerator.get(ae); - + // ***************************************************** // PROCESS // ***************************************************** ae.process(cas); - + // ***************************************************** - // No exception in process() , fetch metrics + // No exception in process() , fetch metrics // ***************************************************** - List<PerformanceMetrics> afterAnalysis = + List<PerformanceMetrics> afterAnalysis = UimaMetricsGenerator.get(ae); // get the delta - List<PerformanceMetrics> casMetrics = + List<PerformanceMetrics> casMetrics = UimaMetricsGenerator.getDelta( afterAnalysis, beforeAnalysis); - -// StringBuilder sb = new StringBuilder("{"); -// -// for (AnalysisEnginePerformanceMetrics metrics : casMetrics) { -// sb.append(resultSerializer.serialize(metrics)).append("}"); -// } -// sb.append("}"); - return new UimaProcessResult(resultSerializer.serialize(casMetrics)); } catch( Exception e ) { logger.log(Level.WARNING,"",e); @@ -244,14 +236,14 @@ public class UimaServiceProcessor implem return result; } finally { - + if (cas != null) { casPool.releaseCas(cas); } } } - + public void setErrorHandler(IServiceErrorHandler errorHandler) { } @@ -266,7 +258,7 @@ public class UimaServiceProcessor implem } } catch( Exception e) { logger.log(Level.WARNING, "stop", e); - } + } } /* // Build just an AE from parts and return the filename @@ -293,3 +285,4 @@ public class UimaServiceProcessor implem } */ } + Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java Tue Jul 17 13:49:47 2018 @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,8 +44,8 @@ import org.apache.uima.util.Level; import org.apache.uima.util.Logger; /** - * - * This protocol handler is a Runnable + * + * This protocol handler is a Runnable * */ public class DefaultServiceProtocolHandler implements IServiceProtocolHandler { @@ -66,13 +66,13 @@ public class DefaultServiceProtocolHandl private IService service; // forces process threads to initialize serially private ReentrantLock initLock = new ReentrantLock(); - + private static AtomicInteger idGenerator = new AtomicInteger(); - - private DefaultServiceProtocolHandler(Builder builder) { - this.initLatch = builder.initLatch; - this.stopLatch = builder.stopLatch; + + private DefaultServiceProtocolHandler(Builder builder) { + this.initLatch = builder.initLatch; + this.stopLatch = builder.stopLatch; this.service = builder.service; this.transport = builder.transport; this.processor = builder.processor; @@ -100,11 +100,12 @@ public class DefaultServiceProtocolHandl // use a lock to serialize initialization one thread at a time initLock.lock(); processor.initialize(); - } catch (Exception e) { + } catch (Throwable e) { initError = true; running = false; + logger.log(Level.WARNING, "ProtocolHandler initialize() failed -",e); throw new ServiceInitializationException( - "Thread:" + Thread.currentThread().getName() + " Failed initialization", e); + "Thread:" + Thread.currentThread().getName() + " Failed initialization"); } finally { initLatch.countDown(); @@ -138,7 +139,7 @@ public class DefaultServiceProtocolHandl throw new TransportException("Received invalid content (null) in response from client - rejecting request"); } o = XStreamUtils.unmarshall(content); - + } catch ( Exception e) { if ( !running ) { throw new TransportException("Service stopping - rejecting request"); @@ -171,7 +172,7 @@ public class DefaultServiceProtocolHandl } private IMetaTaskTransaction callGet(IMetaTaskTransaction transaction) throws Exception { - transaction.setType(Type.Get); + transaction.setType(Type.Get); if ( logger.isLoggable(Level.FINE)) { logger.log(Level.FINE, "ProtocolHandler calling GET"); } @@ -179,7 +180,7 @@ public class DefaultServiceProtocolHandl } /** * Block until service start() is called - * + * * @throws ServiceInitializationException */ private void awaitStart() throws ServiceInitializationException { @@ -194,25 +195,19 @@ public class DefaultServiceProtocolHandl // we may fail in initialize() in which case the ServiceInitializationException // is thrown initialize(); - + // now wait for application to call start awaitStart(); - + // all threads intialized, enter running state IMetaTaskTransaction transaction = null; - + if ( logger.isLoggable(Level.INFO)) { logger.log(Level.INFO, ".............. Thread "+Thread.currentThread().getId() + " ready to process"); } - logger.log(Level.INFO, "Wait for the initialized state to propagate to the SM " + - "so any processing errors are not treates as initialization failures"); - try { - Thread.sleep(30000); - } catch (InterruptedException e1) { - }; - + while (running) { try { @@ -224,13 +219,13 @@ public class DefaultServiceProtocolHandl break; } if (transaction.getMetaTask() == null || transaction.getMetaTask().getUserSpaceTask() == null ) { - // the client has no tasks to give. + // the client has no tasks to give. noTaskStrategy.handleNoTaskSupplied(); continue; } Object task = transaction.getMetaTask().getUserSpaceTask(); - - // send ACK + + // send ACK transaction = callAck(transaction); if (!running) { break; @@ -265,8 +260,8 @@ public class DefaultServiceProtocolHandl }).start(); running = false; } - - + + } catch( IllegalStateException e) { break; @@ -275,14 +270,17 @@ public class DefaultServiceProtocolHandl } catch (Exception e) { logger.log(Level.WARNING,"",e); - } + } } stopLatch.countDown(); + if ( processor != null ) { + processor.stop(); + } logger.log(Level.INFO,"ProtocolHandler terminated"); return String.valueOf(Thread.currentThread().getId()); } - + private void delegateStop() { service.stop(); } @@ -308,8 +306,8 @@ public class DefaultServiceProtocolHandl public void setTransport(IServiceTransport transport) { this.transport = transport; } - - + + public static class Builder { private IServiceTransport transport; private IServiceProcessor processor; @@ -326,15 +324,15 @@ public class DefaultServiceProtocolHandl public Builder withProcessor(IServiceProcessor processor) { this.processor = processor; return this; - } + } public Builder withInitCompleteLatch(CountDownLatch initLatch) { this.initLatch = initLatch; return this; - } + } public Builder withDoneLatch(CountDownLatch stopLatch) { this.stopLatch = stopLatch; return this; - } + } public Builder withNoTaskStrategy(INoTaskAvailableStrategy strategy) { this.strategy = strategy; return this; Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java Tue Jul 17 13:49:47 2018 @@ -1,21 +1,4 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. -*/ + package org.apache.uima.ducc.ps.service; import java.util.Timer; @@ -39,6 +22,7 @@ public class JunitPullServiceTestCase ex int scaleout = 2; super.startJetty(false); // don't block String analysisEngineDescriptor = "TestAAE"; + System.setProperty("ducc.deploy.JpType", "uima"); IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor); @@ -52,7 +36,7 @@ public class JunitPullServiceTestCase ex service.initialize(); Timer fTimer = new Timer("testPullService Timer"); // after 5secs stop the pull service - fTimer.schedule(new MyTimerTask(service, fTimer), 5000); + fTimer.schedule(new MyTimerTask(service, fTimer), 35000); service.start(); @@ -81,7 +65,7 @@ public class JunitPullServiceTestCase ex System.out.println("----------- Starting Service ....."); Timer fTimer = new Timer(); //after 10sec stop the service - fTimer.schedule(new MyTimerTask(service, fTimer), 10000); + fTimer.schedule(new MyTimerTask(service, fTimer), 40000); service.start(); @@ -92,7 +76,7 @@ public class JunitPullServiceTestCase ex throw e; } } - + /* @Test public void testPullServiceWithProcessFailure() throws Exception { int scaleout = 2; @@ -112,7 +96,7 @@ public class JunitPullServiceTestCase ex service.initialize(); Timer fTimer = new Timer("testPullService Timer"); // after 5secs stop the pull service - fTimer.schedule(new MyTimerTask(service, fTimer), 5000); + fTimer.schedule(new MyTimerTask(service, fTimer), 35000); service.start(); @@ -125,7 +109,7 @@ public class JunitPullServiceTestCase ex } } - /* + @Test public void testPullServiceBadClientURL() throws Exception { int scaleout = 2; Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java Tue Jul 17 13:49:47 2018 @@ -37,6 +37,7 @@ public class JUnitServiceWrapperTestCase System.out.println("........... Monitor Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT")); super.startJetty(false); // don't block String analysisEngineDescriptor = "TestAAE"; + System.setProperty("ducc.deploy.JpType", "uima"); String tasURL = "http://localhost:8080/test"; try { @@ -49,7 +50,7 @@ public class JUnitServiceWrapperTestCase Timer fTimer = new Timer("testPullService Timer"); // after 5secs stop the pull service - fTimer.schedule(new MyTimerTask(service, fTimer), 5000); + fTimer.schedule(new MyTimerTask(service, fTimer), 35000); service.initialize(new String[] {analysisEngineDescriptor}); @@ -65,7 +66,7 @@ public class JUnitServiceWrapperTestCase } } - + /* @Test public void testPullServiceWrapperWithProcessFailure() throws Exception { //int scaleout = 2; @@ -105,6 +106,7 @@ public class JUnitServiceWrapperTestCase System.getProperties().remove("ProcessFail"); } } +*/ class MyTimerTask extends TimerTask { final ServiceWrapper service; final Timer fTimer; Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java (original) +++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverComponent.java Tue Jul 17 13:49:47 2018 @@ -33,7 +33,7 @@ import org.apache.uima.ducc.container.jd import org.apache.uima.ducc.container.jd.cas.CasManager; import org.apache.uima.ducc.container.jd.cas.CasManagerStats; import org.apache.uima.ducc.container.jd.mh.IMessageHandler; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction; import org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent; public class JobDriverComponent extends AbstractDuccComponent @@ -126,7 +126,7 @@ implements IJobDriverComponent { return logger; } - public void handleJpRequest(IMetaCasTransaction metaCasTransaction) throws Exception { + public void handleJpRequest(IMetaTaskTransaction metaCasTransaction) throws Exception { String location = "handleJpRequest"; try { IMessageHandler mh = JobDriver.getInstance().getMessageHandler(); Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java (original) +++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java Tue Jul 17 13:49:47 2018 @@ -37,9 +37,9 @@ import org.apache.uima.ducc.common.utils import org.apache.uima.ducc.common.utils.XStreamUtils; import org.apache.uima.ducc.common.utils.id.DuccId; import org.apache.uima.ducc.container.jd.mh.MessageHandler; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction; -import org.apache.uima.ducc.container.net.impl.MetaCasTransaction; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Direction; +import org.apache.uima.ducc.ps.net.impl.MetaTaskTransaction; import org.apache.uima.ducc.transport.DuccTransportConfiguration; import org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent; import org.apache.uima.ducc.transport.dispatcher.ProcessStateDispatcher; @@ -154,8 +154,8 @@ import org.springframework.context.annot } public void process(Exchange exchange) throws Exception { // Get the transaction object sent by the JP - IMetaCasTransaction imt = - exchange.getIn().getBody(MetaCasTransaction.class); + IMetaTaskTransaction imt = + exchange.getIn().getBody(MetaTaskTransaction.class); // process JP's request jdc.handleJpRequest(imt); @@ -259,12 +259,12 @@ import org.springframework.context.annot //request.getReader().read(content); logger.debug("doPost",jobid, "Http Request Body:::"+String.valueOf(content)); - IMetaCasTransaction imt=null; + IMetaTaskTransaction imt=null; //String t = String.valueOf(content); // imt = (IMetaCasTransaction) XStreamUtils // .unmarshall(t.trim()); - imt = (IMetaCasTransaction) XStreamUtils + imt = (IMetaTaskTransaction) XStreamUtils .unmarshall(content); MessageHandler.accumulateTimes("Unmarshall", post_stime); Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java (original) +++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/iface/IJobDriverComponent.java Tue Jul 17 13:49:47 2018 @@ -19,8 +19,8 @@ package org.apache.uima.ducc.transport.configuration.jd.iface; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction; public interface IJobDriverComponent { - public void handleJpRequest(IMetaCasTransaction metaCasTransaction) throws Exception; + public void handleJpRequest(IMetaTaskTransaction metaCasTransaction) throws Exception; } Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original) +++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Tue Jul 17 13:49:47 2018 @@ -44,15 +44,15 @@ import org.apache.uima.ducc.common.IDucc import org.apache.uima.ducc.common.NodeIdentity; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.XStreamUtils; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type; -import org.apache.uima.ducc.container.net.iface.IPerformanceMetrics; -import org.apache.uima.ducc.container.net.impl.MetaCasTransaction; -import org.apache.uima.ducc.container.net.impl.PerformanceMetrics; -import org.apache.uima.ducc.container.net.impl.TransactionId; import org.apache.uima.ducc.container.sd.ServiceRegistry; import org.apache.uima.ducc.container.sd.ServiceRegistry_impl; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Direction; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type; +import org.apache.uima.ducc.ps.net.iface.IPerformanceMetrics; +import org.apache.uima.ducc.ps.net.impl.MetaTaskTransaction; +import org.apache.uima.ducc.ps.net.impl.PerformanceMetrics; +import org.apache.uima.ducc.ps.net.impl.TransactionId; public class DuccHttpClient { private final static String REGISTERED_DRIVER = "ducc.deploy.registered.driver"; @@ -219,7 +219,7 @@ public class DuccHttpClient { return pn; } - private void addCommonHeaders( IMetaCasTransaction transaction ) { + private void addCommonHeaders( IMetaTaskTransaction transaction ) { String location = "addCommonHeaders"; transaction.setRequesterAddress(getIP()); transaction.setRequesterNodeName(getNodeName()); @@ -246,9 +246,9 @@ public class DuccHttpClient { } - public IMetaCasTransaction execute( IMetaCasTransaction transaction, HttpPost postMethod ) throws Exception { + public IMetaTaskTransaction execute( IMetaTaskTransaction transaction, HttpPost postMethod ) throws Exception { Exception lastError = null; - IMetaCasTransaction reply=null; + IMetaTaskTransaction reply=null; addCommonHeaders(transaction); transaction.setDirection(Direction.Request); @@ -302,8 +302,8 @@ public class DuccHttpClient { logger.error("execute", null, "Thread:"+Thread.currentThread().getId()+" ERRR::Content causing error:"+content,ex); throw ex; } - if ( o instanceof IMetaCasTransaction) { - reply = (MetaCasTransaction)o; + if ( o instanceof IMetaTaskTransaction) { + reply = (MetaTaskTransaction)o; } else { throw new InvalidClassException("Expected IMetaCasTransaction - Instead Received "+o.getClass().getName()); } @@ -325,7 +325,7 @@ public class DuccHttpClient { } } - private HttpResponse retryUntilSuccessfull(IMetaCasTransaction transaction, HttpPost postMethod) throws Exception { + private HttpResponse retryUntilSuccessfull(IMetaTaskTransaction transaction, HttpPost postMethod) throws Exception { HttpResponse response=null; // Only one thread attempts recovery. Other threads will block here // until connection to the remote is restored. @@ -370,7 +370,7 @@ public class DuccHttpClient { //client.setTimeout(30000); client.initialize(args[0]); int minor = 0; - IMetaCasTransaction transaction = new MetaCasTransaction(); + IMetaTaskTransaction transaction = new MetaTaskTransaction(); AtomicInteger seq = new AtomicInteger(0); TransactionId tid = new TransactionId(seq.incrementAndGet(), minor); transaction.setTransactionId(tid); @@ -382,7 +382,7 @@ public class DuccHttpClient { // send a request to JD and wait for a reply transaction = client.execute(transaction, postMethod); // The JD may not provide a Work Item to process. - if ( transaction.getMetaCas()!= null) { + if ( transaction.getMetaTask()!= null) { // Confirm receipt of the CAS. transaction.setType(Type.Ack); //command = Type.Ack.name(); @@ -395,11 +395,11 @@ public class DuccHttpClient { //command = Type.End.name(); tid = new TransactionId(seq.incrementAndGet(), minor++); transaction.setTransactionId(tid); - IPerformanceMetrics metricsWrapper = - new PerformanceMetrics(); + // IPerformanceMetrics metricsWrapper = + // new PerformanceMetrics(); - metricsWrapper.set(Arrays.asList(new Properties())); - transaction.getMetaCas().setPerformanceMetrics(metricsWrapper); + //metricsWrapper.set(Arrays.asList(new Properties())); + //transaction.getMetaTask().setPerformanceMetrics(metricsWrapper); transaction = client.execute(transaction, postMethod); Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original) +++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Tue Jul 17 13:49:47 2018 @@ -24,6 +24,7 @@ import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.SocketTimeoutException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -33,12 +34,15 @@ import java.util.concurrent.atomic.Atomi import org.apache.http.client.methods.HttpPost; import org.apache.uima.ducc.common.utils.DuccLogger; -import org.apache.uima.ducc.container.net.iface.IMetaCas; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type; -import org.apache.uima.ducc.container.net.impl.MetaCasTransaction; -import org.apache.uima.ducc.container.net.impl.PerformanceMetrics; -import org.apache.uima.ducc.container.net.impl.TransactionId; +import org.apache.uima.ducc.ps.net.iface.IMetaTask; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type; +import org.apache.uima.ducc.ps.net.impl.MetaTaskTransaction; + +import org.apache.uima.ducc.ps.net.impl.TransactionId; +import org.apache.uima.ducc.ps.service.processor.IServiceResultSerializer; +import org.apache.uima.ducc.ps.service.processor.uima.utils.PerformanceMetrics; +import org.apache.uima.ducc.ps.service.processor.uima.utils.UimaResultDefaultSerializer; import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess; import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState; @@ -52,8 +56,8 @@ public class HttpWorkerThread implements private Object processorInstance = null; private static AtomicInteger IdGenerator = new AtomicInteger(); - private Map<String, IMetaCasTransaction> transactionMap = - new ConcurrentHashMap<String, IMetaCasTransaction>(); + private Map<String, IMetaTaskTransaction> transactionMap = + new ConcurrentHashMap<String, IMetaTaskTransaction>(); static AtomicInteger maxFrameworkFailures; private int maxFrameworkErrors = 2; // default // define what happens to this jvm when process() method fails. @@ -63,7 +67,7 @@ public class HttpWorkerThread implements public HttpWorkerThread(JobProcessComponent component, DuccHttpClient httpClient, Object processorInstance, CountDownLatch workerThreadCount, - CountDownLatch threadReadyCount, Map<String, IMetaCasTransaction> transactionMap, + CountDownLatch threadReadyCount, Map<String, IMetaTaskTransaction> transactionMap, AtomicInteger maxFrameworkFailures ) { this.duccComponent = component; this.httpClient = httpClient; @@ -85,10 +89,10 @@ public class HttpWorkerThread implements } } - public IMetaCasTransaction getWork(HttpPost postMethod, int major, int minor) throws Exception { + public IMetaTaskTransaction getWork(HttpPost postMethod, int major, int minor) throws Exception { String command=""; - IMetaCasTransaction transaction = new MetaCasTransaction(); + IMetaTaskTransaction transaction = new MetaTaskTransaction(); try { TransactionId tid = new TransactionId(major, minor); transaction.setTransactionId(tid); @@ -102,16 +106,16 @@ public class HttpWorkerThread implements transaction = httpClient.execute(transaction, postMethod); //httpClient.testConnection(); // The JD may not provide a Work Item to process. - if ( transaction != null && transaction.getMetaCas()!= null) { - logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaCas().getSystemKey()); + if ( transaction != null && transaction.getMetaTask()!= null) { + logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd WI:"+transaction.getMetaTask().getSystemKey()); // Confirm receipt of the CAS. transaction.setType(Type.Ack); command = Type.Ack.name(); tid = new TransactionId(major, minor++); transaction.setTransactionId(tid); - logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Sending ACK request - WI:"+transaction.getMetaCas().getSystemKey()); + logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" Sending ACK request - WI:"+transaction.getMetaTask().getSystemKey()); transaction = httpClient.execute(transaction, postMethod); - if ( transaction.getMetaCas() == null) { + if ( transaction.getMetaTask() == null) { // this can be the case when a JD receives ACK late logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" ACK reply recv'd, however there is no MetaCas. The JD Cancelled the transaction"); } else { @@ -179,7 +183,7 @@ public class HttpWorkerThread implements // allow agent some time to process FailedInitialization event Thread.sleep(2000); } catch( Exception e) {} - System.exit(1); + // System.exit(1); /* *****************************************/ /* *****************************************/ /* *****************************************/ @@ -211,7 +215,7 @@ public class HttpWorkerThread implements logger.info("HttpWorkerThread.run()", null, "Begin Processing Work Items - Thread Id:"+Thread.currentThread().getId()); try { - IMetaCasTransaction transaction=null; + IMetaTaskTransaction transaction=null; int major = 0; int minor = 0; // Enter process loop. Stop this thread on the first process error. @@ -239,7 +243,7 @@ public class HttpWorkerThread implements // done. In such case, reduce frequency of Get requests // by sleeping in between Get's. Eventually the OR will // deallocate this process and the thread will exit - if ( transaction.getMetaCas() == null || transaction.getMetaCas().getUserSpaceCas() == null) { + if ( transaction.getMetaTask() == null || transaction.getMetaTask().getUserSpaceTask() == null) { logger.info("run", null, "Client is out of work - will retry quietly every",duccComponent.getThreadSleepTime()/1000,"secs."); // Retry at the start of this block as another thread may have just exited with work @@ -247,7 +251,7 @@ public class HttpWorkerThread implements synchronized (HttpWorkerThread.class) { while(duccComponent.isRunning() ) { transaction = getWork(postMethod, major, ++minor); - if ( transaction.getMetaCas() != null && transaction.getMetaCas().getUserSpaceCas() != null ) { + if ( transaction.getMetaTask() != null && transaction.getMetaTask().getUserSpaceTask() != null ) { logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" work flow has restarted"); break; } @@ -273,7 +277,7 @@ public class HttpWorkerThread implements // that key to allow us to look up original transaction so that // we can send reset request to the JD. String key = (String) - getKeyMethod.invoke(processorInstance, transaction.getMetaCas().getUserSpaceCas()); + getKeyMethod.invoke(processorInstance, transaction.getMetaTask().getUserSpaceTask()); if ( key != null ) { // add transaction under th transactionMap.put(key, transaction); @@ -286,7 +290,7 @@ public class HttpWorkerThread implements // using java reflection, call process to analyze the CAS. While // we are blocking, user code may issue investment reset asynchronously. List<Properties> metrics = (List<Properties>)processMethod. - invoke(processorInstance, transaction.getMetaCas().getUserSpaceCas()); + invoke(processorInstance, transaction.getMetaTask().getUserSpaceTask()); // *********************************** if ( key != null ) { // process ended we no longer expect investment reset from user @@ -295,10 +299,29 @@ public class HttpWorkerThread implements } logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed"); - PerformanceMetrics metricsWrapper = - new PerformanceMetrics(); - metricsWrapper.set(metrics); - transaction.getMetaCas().setPerformanceMetrics(metricsWrapper); + //PerformanceMetrics metricsWrapper = + // new PerformanceMetrics(); + // metricsWrapper.set(metrics); + IServiceResultSerializer deserializer = + new UimaResultDefaultSerializer(); + + /* + p.setProperty("name", metrics.getName()); + p.setProperty("uniqueName", metrics.getUniqueName()); + p.setProperty("analysisTime", + String.valueOf(metrics.getAnalysisTime())); + p.setProperty("numProcessed", + String.valueOf(metrics.getNumProcessed())); + + */ + List<PerformanceMetrics> pmList = new ArrayList<PerformanceMetrics>(); + for( Properties p : metrics) { + PerformanceMetrics pm = + new PerformanceMetrics(p.getProperty("name"), p.getProperty("uniqueName"), Long.parseLong(p.getProperty("analysisTime"))); + pmList.add(pm); + } + + transaction.getMetaTask().setPerformanceMetrics(deserializer.serialize(pmList)); } catch( InvocationTargetException ee) { @@ -315,7 +338,7 @@ public class HttpWorkerThread implements if ( !duccComponent.isRunning() ) { break; } - IMetaCas mc = transaction.getMetaCas(); + IMetaTask mc = transaction.getMetaTask(); //byte[] serializedException = null; Method getLastSerializedErrorMethod = processorInstance.getClass().getDeclaredMethod("getLastSerializedError"); byte[] serializedException = @@ -342,10 +365,10 @@ public class HttpWorkerThread implements transaction.getMetaCas().setUserSpaceException(baos.toByteArray()); */ logger.error("run", null, ee); - transaction.getMetaCas().setUserSpaceException(serializedException); + transaction.getMetaTask().setUserSpaceException(serializedException); } // Dont return serialized CAS to reduce the msg size - transaction.getMetaCas().setUserSpaceCas(null); + transaction.getMetaTask().setUserSpaceTask(null); transaction.setType(Type.End); //String command = Type.End.name(); @@ -369,7 +392,7 @@ public class HttpWorkerThread implements String wid = null; try { - wid = transaction.getMetaCas().getSystemKey(); + wid = transaction.getMetaTask().getSystemKey(); } catch( Exception e) { } Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original) +++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Tue Jul 17 13:49:47 2018 @@ -43,8 +43,8 @@ import org.apache.uima.ducc.common.compo import org.apache.uima.ducc.common.container.FlagsHelper; import org.apache.uima.ducc.common.main.DuccService; import org.apache.uima.ducc.common.utils.DuccLogger; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type; import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState; public class JobProcessComponent extends AbstractDuccComponent @@ -67,8 +67,8 @@ implements IJobProcessor{ Lock stateLock = new ReentrantLock(); private volatile boolean uimaASJob=false; - Map<String, IMetaCasTransaction> transactionMap = - new ConcurrentHashMap<String, IMetaCasTransaction>(); + Map<String, IMetaTaskTransaction> transactionMap = + new ConcurrentHashMap<String, IMetaTaskTransaction>(); final static Lock lock = new ReentrantLock();; @@ -164,10 +164,10 @@ implements IJobProcessor{ public void resetInvestment(String key) throws Exception { if ( httpClient != null && transactionMap.containsKey(key) ) { // Fetch a transaction object associated with a WI id (key) - IMetaCasTransaction transaction = transactionMap.get(key); + IMetaTaskTransaction transaction = transactionMap.get(key); HttpPost postMethod = new HttpPost(httpClient.getJdUrl()); // Dont return serialized CAS to reduce the msg size - transaction.getMetaCas().setUserSpaceCas(null); + transaction.getMetaTask().setUserSpaceTask(null); transaction.setType(Type.InvestmentReset); // Set request timeout @@ -177,13 +177,13 @@ implements IJobProcessor{ // loaded into the user container from which this call originated. while( isRunning() ) { try { - logger.info("resetInvestment", null, "User Requested Investment Reset - sending request to JD - WI:"+transaction.getMetaCas().getSystemKey()+" user key:"+key); + logger.info("resetInvestment", null, "User Requested Investment Reset - sending request to JD - WI:"+transaction.getMetaTask().getSystemKey()+" user key:"+key); httpClient.execute(transaction, postMethod); break; } catch(SocketTimeoutException e) { - logger.info("resetInvestment", null, "Timeout while waiting for Investment Reset response from JD - retrying - WI:"+transaction.getMetaCas().getSystemKey()); + logger.info("resetInvestment", null, "Timeout while waiting for Investment Reset response from JD - retrying - WI:"+transaction.getMetaTask().getSystemKey()); } catch(Exception e) { - logger.info("resetInvestment", null, "Error while trying send Investment Reset request to JD. Returning to the caller (no retries) WI:"+transaction.getMetaCas().getSystemKey()); + logger.info("resetInvestment", null, "Error while trying send Investment Reset request to JD. Returning to the caller (no retries) WI:"+transaction.getMetaTask().getSystemKey()); logger.info("resetInvestment", null, e); throw new RuntimeException("Unable to deliver Investment Reset request to JD due to "+e.getCause().getMessage()); } Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/JobDriverReport.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/JobDriverReport.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/JobDriverReport.java (original) +++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/JobDriverReport.java Tue Jul 17 13:49:47 2018 @@ -35,7 +35,7 @@ import org.apache.uima.ducc.container.jd import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo.CompletionType; import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo; import org.apache.uima.ducc.container.jd.mh.iface.IWorkItemInfo; -import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.JdState; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.JdState; import org.apache.uima.ducc.transport.event.common.DuccPerWorkItemStatistics; import org.apache.uima.ducc.transport.event.common.DuccProcessWorkItems; import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType; Modified: uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java?rev=1836115&r1=1836114&r2=1836115&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java (original) +++ uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/DuccJobService.java Tue Jul 17 13:49:47 2018 @@ -19,6 +19,7 @@ package org.apache.uima.ducc.user.common.main; import java.io.File; +import java.io.IOException; import java.lang.reflect.Method; import java.net.MalformedURLException; import java.net.URL; @@ -29,7 +30,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.uima.ducc.user.common.investment.Investment; -import org.apache.uima.ducc.user.jp.iface.IProcessContainer; /** * Main program that is used to launch Job Process(JP). @@ -38,10 +38,9 @@ import org.apache.uima.ducc.user.jp.ifac public class DuccJobService { boolean DEBUG = false; private Investment investment = null; - private Method stopMethod; - private Object duccContainerInstance; private Logger logger = Logger.getLogger(DuccJobService.class.getName()); - + private IServiceWrapper service = null; + public static URLClassLoader create(String classPath) throws MalformedURLException { return create(classPath.split(":")); @@ -88,94 +87,88 @@ public class DuccJobService { } } } - + private void addUrlsToSystemLoader(URL[] urls) throws IOException { + URLClassLoader systemClassLoader = (URLClassLoader) ClassLoader + .getSystemClassLoader(); + try { + Method method = URLClassLoader.class.getDeclaredMethod("addURL", + new Class[] { URL.class }); + method.setAccessible(true); // is normally "protected" + for (URL url : urls) { + method.invoke(systemClassLoader, new Object[] { url }); + } + } catch (Throwable t) { + t.printStackTrace(); + throw new IOException( + "Error, could not add URL to system classloader"); + } + } + private URL[] getUrlsFromDuccClasspath(String[] duccClassPath) + throws MalformedURLException { + ArrayList<URL> urlList = new ArrayList<URL>(duccClassPath.length); + for (String element : duccClassPath) { + if (element.endsWith("*")) { + File dir = new File(element.substring(0, element.length() - 1)); + File[] files = dir.listFiles(); // Will be null if missing or + // not a dir + if (files != null) { + for (File f : files) { + if (f.getName().endsWith(".jar")) { + urlList.add(f.toURI().toURL()); + } + } + } + } else { + File f = new File(element); + if (f.exists()) { + urlList.add(f.toURI().toURL()); + } + } + } + URL[] urls = new URL[urlList.size()]; + return urlList.toArray(urls); + } public void start(String[] args) throws Exception { try { investment = new Investment(); - - // cache current context classloader - ClassLoader sysCL = Thread.currentThread().getContextClassLoader(); // Fetch a classpath for the fenced Ducc container String duccContainerClasspath = System.getProperty("ducc.deploy.DuccClasspath"); - URLClassLoader ucl = create(duccContainerClasspath); + if ( duccContainerClasspath != null ) { + URL[] urls = getUrlsFromDuccClasspath(duccContainerClasspath.split(":")); + addUrlsToSystemLoader(urls); + } if (System.getProperty("ducc.debug") != null) { DEBUG = true; } if (DEBUG) { - dump(ucl, 4); + dump((URLClassLoader) ClassLoader + .getSystemClassLoader(), 4); } - - // Load the DuccService class and find its methods of interest - Class<?> duccServiceClass = ucl.loadClass("org.apache.uima.ducc.common.main.DuccService"); - Method bootMethod = duccServiceClass.getMethod("boot", String[].class); - Method setProcessorMethod = duccServiceClass.getMethod("setProcessor", Object.class, String[].class); - Method registerInvestmentInstanceMethod = duccServiceClass.getMethod("registerInvestmentInstance", Object.class); - Method startMethod = duccServiceClass.getMethod("start"); - stopMethod = duccServiceClass.getMethod("stop"); - - // Establish user's logger early to prevent the DUCC code from accidentally doing so - logger.log(Level.INFO, ">>>>>>>>> Booting Ducc Container"); - - HashMap<String, String> savedPropsMap = hideLoggingProperties(); // Ensure DUCC doesn't try to use the user's logging setup - - // Construct & initialize Ducc fenced container. - // It calls component's Configuration class - Thread.currentThread().setContextClassLoader(ucl); - duccContainerInstance = duccServiceClass.newInstance(); - bootMethod.invoke(duccContainerInstance, (Object) args); - - logger.log(Level.INFO, "<<<<<<<< Ducc Container booted"); - restoreLoggingProperties(savedPropsMap); // May not be necessary as user's logger has been established - - // below property is set by component's Configuration class. It can also - // be provided on the command line in case a custom processor is needed. - String processorClass = System.getProperty("ducc.deploy.JpProcessorClass"); - - // Instantiate process container where the actual analysis will be done. - // Currently there are three containers: - // 1 - UimaProcessContainer - used for pieces parts (UIMA only) - // 2 - UimaASProcessContainer - used for DD jobs - // 3 - UimaASServiceContainer - used for UIMA-AS based services - // - // NOTE: the container class is loaded by the main System classloader - // and requires uima-ducc-user jar to be in the System classpath. - // -------------------------------------------------------------------- - // load the process container class using the initial system class loader - Class<?> processorClz = sysCL.loadClass(processorClass); - IProcessContainer pc = (IProcessContainer) processorClz.newInstance(); - - logger.log(Level.INFO, ">>>>>>>>> Running Ducc Container"); - - // Call DuccService.setProcessor() to hand-off instance of the - // process container to the component along with this process args - setProcessorMethod.invoke(duccContainerInstance, pc, args); - - // Hand-off investment object - registerInvestmentInstanceMethod.invoke(duccContainerInstance, investment); - - // Call DuccService.start() to initialize the process and begin processing - startMethod.invoke(duccContainerInstance); + service = ServiceFactory.newService(); + service.initialize(args); + service.start(); + +// HashMap<String, String> savedPropsMap = hideLoggingProperties(); // Ensure DUCC doesn't try to use the user's logging setup +// restoreLoggingProperties(savedPropsMap); // May not be necessary as user's logger has been established logger.log(Level.INFO, "<<<<<<<< Ducc Container ended"); } catch( Throwable t) { - t.printStackTrace(); - System.out.println("Exiting Process Due to Unrecoverable Error"); + logger.log(Level.SEVERE, "<<<<<<<< Exiting Process Due to Unrecoverable Error:", t); Runtime.getRuntime().halt(99); // User Error } } /* - * Terminate the service connection + * Terminate service connection */ public void stop() { try { - stopMethod.invoke(duccContainerInstance); + service.stop(); } catch( Throwable t) { logger.log(Level.SEVERE, "Stop failed"); - t.printStackTrace(); } } @@ -211,5 +204,7 @@ public class DuccJobService { e.printStackTrace(); } } + + } Added: uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/IServiceWrapper.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/IServiceWrapper.java?rev=1836115&view=auto ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/IServiceWrapper.java (added) +++ uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/IServiceWrapper.java Tue Jul 17 13:49:47 2018 @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.uima.ducc.user.common.main; + +public interface IServiceWrapper { + public void initialize(String[] args) throws Exception; + public void start() throws Exception; + public void stop() throws Exception; +} Added: uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/PullServiceWrapper.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/PullServiceWrapper.java?rev=1836115&view=auto ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/PullServiceWrapper.java (added) +++ uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/PullServiceWrapper.java Tue Jul 17 13:49:47 2018 @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.uima.ducc.user.common.main; + +import java.lang.reflect.Method; + +/* + * Implements dynamic loading of a service class, initializes its instance + * and starts it. + */ + +public class PullServiceWrapper implements IServiceWrapper{ + + private Object instance = null; + private Class<?> duccServiceClass = null; + + public void initialize(String[] args) throws Exception{ + duccServiceClass = + ClassLoader.getSystemClassLoader().loadClass("org.apache.uima.ducc.ps.service.main.ServiceWrapper"); + instance = duccServiceClass.newInstance(); + Method initMethod = duccServiceClass.getMethod("initialize", String[].class); + initMethod.invoke(instance, (Object) args); + } + public void start() throws Exception { + Method startMethod = duccServiceClass.getMethod("start"); + startMethod.invoke(instance); + } + + public void stop() throws Exception { + Method stopMethod = duccServiceClass.getMethod("stop"); + stopMethod.invoke(instance); + + } +} Added: uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/ServiceFactory.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/ServiceFactory.java?rev=1836115&view=auto ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/ServiceFactory.java (added) +++ uima/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/common/main/ServiceFactory.java Tue Jul 17 13:49:47 2018 @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.uima.ducc.user.common.main; + +public class ServiceFactory { + + private ServiceFactory() {} + + public static IServiceWrapper newService() throws ClassNotFoundException, InstantiationException, IllegalAccessException { + Class<?> serviceClass = null; + String serviceClassName = null; + // use custom service class or default. Custom class must implement ServiceWrapper + if ( ( serviceClassName = System.getProperty("ducc.service.class")) != null ) { + serviceClass = Thread.currentThread().getContextClassLoader().loadClass(serviceClassName); + return (IServiceWrapper)serviceClass.newInstance(); + } else { + // use default + return new PullServiceWrapper(); + } + } +}
