Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java?rev=1858488&r1=1858487&r2=1858488&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java Wed May 1 13:42:39 2019 @@ -1,20 +1,20 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.uima.ducc.ps.service.transport.http; @@ -30,6 +30,7 @@ import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.http.HttpEntity; @@ -61,358 +62,408 @@ import org.apache.uima.ducc.ps.service.u import org.apache.uima.util.Level; import org.apache.uima.util.Logger; +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.io.xml.DomDriver; + public class HttpServiceTransport implements IServiceTransport { - private Logger logger = UIMAFramework.getLogger(HttpServiceTransport.class); - private HttpClient httpClient = null; - private PoolingHttpClientConnectionManager cMgr = null; - private int clientMaxConnections = 1; - private int clientMaxConnectionsPerRoute = 1; - private int clientMaxConnectionsPerHostPort = 0; - private ReentrantLock registryLookupLock = new ReentrantLock(); - private long threadSleepTime=1000; // millis - private final String nodeIP; - private final String nodeName; - private final String pid; - private ITargetURI currentTargetUrl = new NoOpTargetURI(); - private static final String NA="N/A"; - private TransportStats stats = new TransportStats(); - private IRegistryClient registryClient; - // holds reference to HttpPost object for every thread. Key=thread id - private Map<Long,HttpPost> httpPostMap = - new HashMap<>(); - private volatile boolean stopping = false; - private volatile boolean running = false; - private volatile boolean log = true; - - public HttpServiceTransport(IRegistryClient registryClient, int scaleout) throws ServiceException { - this.registryClient = registryClient; - clientMaxConnections = scaleout; - - - if ( Objects.isNull(System.getenv("DUCC_IP")) || Objects.isNull(System.getenv("DUCC_NODENAME"))) { - try { - nodeIP = InetAddress.getLocalHost().getHostAddress(); - nodeName=InetAddress.getLocalHost().getCanonicalHostName(); - - } catch( UnknownHostException e) { - throw new RuntimeException(new TransportException("HttpServiceTransport.ctor - Unable to determine Host Name and IP",e)); - } - - } else { - // Use agent provided node identity. This is important when running in a sim mode - // where nodes are virtual. - nodeIP = System.getenv("DUCC_IP"); - nodeName = System.getenv("DUCC_NODENAME"); - - } - pid = getProcessIP(NA); - } - private HttpPost getPostMethodForCurrentThread() { - HttpPost postMethod; - if ( !httpPostMap.containsKey(Thread.currentThread().getId())) { - // each thread needs its own PostMethod - postMethod = - new HttpPost(currentTargetUrl.asString()); - httpPostMap.put(Thread.currentThread().getId(),postMethod); - } else { - postMethod = httpPostMap.get(Thread.currentThread().getId()); - } - return postMethod; - } - private String getProcessIP(final String fallback) { - // the following code returns '<pid>@<hostname>' - String name = ManagementFactory.getRuntimeMXBean().getName(); - int pos = name.indexOf('@'); - - if (pos < 1) { - // pid not found - return fallback; - } - - try { - return Long.toString(Long.parseLong(name.substring(0, pos))); - } catch (NumberFormatException e) { - // ignore - } - return fallback; - } - private void lookupNewTarget() { - registryLookupLock.lock(); - while( !stopping ) { - try { - String newTarget = registryClient.lookUp(currentTargetUrl.asString()); - currentTargetUrl = TargetURIFactory.newTarget(newTarget); - break; - } catch( Exception e) { - synchronized (httpClient) { - - try { - httpClient.wait(threadSleepTime); - } catch( InterruptedException ex) { - Thread.currentThread().interrupt(); - break; - } - } - } - } - if (registryLookupLock.isHeldByCurrentThread()) { - registryLookupLock.unlock(); - } - } - public void addRequestorInfo(IMetaTaskTransaction transaction) { - transaction.setRequesterAddress(nodeIP); - transaction.setRequesterNodeName(nodeName); - transaction.setRequesterProcessId(Integer.valueOf(pid)); - transaction.setRequesterThreadId((int)Thread.currentThread().getId()); - if ( logger.isLoggable(Level.FINE )) { - logger.log(Level.FINE,"ip:"+transaction.getRequesterAddress()); - logger.log(Level.FINE, "nodeName:"+transaction.getRequesterNodeName()); - logger.log(Level.FINE, "processName:"+transaction.getRequesterProcessName()); - logger.log(Level.FINE,"processId:"+transaction.getRequesterProcessId()); - logger.log(Level.FINE, "threadId:"+transaction.getRequesterThreadId()); - - } - - } - public void initialize() throws ServiceInitializationException { - - // use plugged in registry to lookup target to connect to. - // Sets global: currentTarget - lookupNewTarget(); - - cMgr = new PoolingHttpClientConnectionManager(); - - if (clientMaxConnections > 0) { - cMgr.setMaxTotal(clientMaxConnections); - } - // Set default max connections per route - if (clientMaxConnectionsPerRoute > 0) { - cMgr.setDefaultMaxPerRoute(clientMaxConnectionsPerRoute); - } - HttpHost httpHost = new HttpHost(currentTargetUrl.asString(), Integer.valueOf(currentTargetUrl.getPort()), currentTargetUrl.getContext()); - if (clientMaxConnectionsPerHostPort > 0) { - cMgr.setMaxPerRoute(new HttpRoute(httpHost), clientMaxConnectionsPerHostPort); - } - - httpClient = HttpClients.custom().setConnectionManager(cMgr).build(); - running = true; - - } - private void addCommonHeaders( HttpPost method ) { - synchronized( HttpServiceTransport.class ) { - - method.setHeader("IP", nodeIP); - method.setHeader("Hostname", nodeName); - method.setHeader("ThreadID", - String.valueOf(Thread.currentThread().getId())); - method.setHeader("PID", pid); - - } - - } - - private HttpEntity wrapRequest(String serializedRequest) { - return new StringEntity(serializedRequest, ContentType.APPLICATION_XML); - } - - private boolean isRunning() { - return running; - } - - private IMetaTaskTransaction retryUntilSuccessfull(String request, HttpPost postMethod) { - IMetaTaskTransaction response=null; - - // retry until service is stopped - while (isRunning()) { - try { - response = doPost(postMethod); - break; - - } catch (TransportException | IOException | URISyntaxException exx) { - try { - Thread.sleep(threadSleepTime); - } catch( InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - lookupNewTarget(); - - } - return response; - - } - private IMetaTaskTransaction doPost(HttpPost postMethod) throws URISyntaxException, IOException, TransportException { - postMethod.setURI(new URI(currentTargetUrl.asString())); - - IMetaTaskTransaction metaTransaction=null; - HttpResponse response = httpClient.execute(postMethod); - // if ( stopping ) { - // throw new TransportException("Service stopping - rejecting request"); - //} - HttpEntity entity = response.getEntity(); - String serializedResponse = EntityUtils.toString(entity); - Object transaction=null; - try { - transaction = XStreamUtils.unmarshall(serializedResponse); - } catch(Exception e) { - logger.log(Level.WARNING,"Process Thread:"+Thread.currentThread().getId()+" Error while deserializing response with XStream",e); - throw new TransportException(e); - } - if ( Objects.isNull(transaction)) { - throw new InvalidClassException( - "Expected IMetaTaskTransaction - Instead Received NULL"); - - } else if ( !(transaction instanceof IMetaTaskTransaction) ) { - throw new InvalidClassException( - "Expected IMetaTaskTransaction - Instead Received " + transaction.getClass().getName()); - } - metaTransaction = (IMetaTaskTransaction) transaction; - - StatusLine statusLine = response.getStatusLine(); - if (statusLine.getStatusCode() != 200 ) { - // all IOExceptions are retried - throw new IOException( - "Unexpected HttpClient response status:"+statusLine+ " Content causing error:"+serializedResponse); - } - - stats.incrementSuccessCount(); - return metaTransaction; - } - /** - * Dispatches request to remote driver via doPost(). Its synchronized to prevent over-running the driver with - * requests from multiple threads. When the transport fails sending GET/ACK/END a single thread - * will try to recover connection and send the request. - * - */ - @Override - public synchronized IMetaTaskTransaction dispatch(String serializedRequest) throws TransportException { - //if ( stopping ) { - // throw new IllegalStateException("Service transport has been stopped, unable to dispatch request"); - // } - IMetaTaskTransaction transaction=null; - HttpEntity e = wrapRequest(serializedRequest); - // Each thread has its own HttpPost method. If current thread - // doesnt have one, it will be created and added to the local - // Map. Subsequent requests will fetch it from the map using - // current thread ID as a key. - HttpPost postMethod = getPostMethodForCurrentThread(); - addCommonHeaders(postMethod); - postMethod.setEntity(e); - try { - String simulatedException; - // To test transport errors add command line option -DMockHttpPostError=exception where - // exception is one of the following Strings: - // - // IOException, - // SocketException, - // UnknownHostException, - // NoRouteToHostException, - // NoHttpResponseException, - // HttpHostConnectException, - // URISyntaxException - // Use JUnit test JunitTransoirtTestCase to test the above errors - - if ( ( simulatedException = System.getProperty("MockHttpPostError")) != null ) { - HttpClientExceptionGenerator mockExceptionGenerator = - new HttpClientExceptionGenerator(simulatedException); - mockExceptionGenerator.throwSimulatedException(); - } else { - transaction = doPost(postMethod); - } - } catch( IOException | URISyntaxException ex) { - if ( stopping ) { - // looks like the process is in the shutdown mode. Log an exception and dont retry - logger.log(Level.INFO,"Process Thread:"+Thread.currentThread().getId()+" - Process is already stopping - Caught Exception while calling doPost() \n"+ex); - throw new TransportException(ex); - } else { - if ( log ) { - log = false; - stats.incrementErrorCount(); - logger.log(Level.WARNING, this.getClass().getName()+".dispatch() >>>>>>>>>> Handling Exception \n"+ex); - logger.log(Level.INFO, ">>>>>>>>>> Unable to communicate with target:"+currentTargetUrl.asString()+" - retrying until successfull - with "+threadSleepTime/1000+" seconds wait between retries "); - } - transaction = retryUntilSuccessfull(serializedRequest, postMethod); - log = true; - logger.log(Level.INFO, "Established connection to target:"+currentTargetUrl.asString()); - } - - - } finally { - postMethod.releaseConnection(); - } - return transaction; - - } - - public void stop(boolean quiesce) { - - stopping = true; - running = false; - // Use System.out since the logger's ShutdownHook may have closed streams - System.out.println(Utils.getTimestamp()+">>>>>>> "+Utils.getShortClassname(this.getClass())+" stop() called - mode:"+(quiesce==true?"quiesce":"stop")); - logger.log(Level.INFO,this.getClass().getName()+" stop() called"); - if ( !quiesce && cMgr != null ) { - cMgr.shutdown(); - System.out.println(Utils.getTimestamp()+">>>>>>> "+Utils.getShortClassname(this.getClass())+" stopped connection mgr"); - logger.log(Level.INFO,this.getClass().getName()+" stopped connection mgr"); - - } - } - public static void main(String[] args) { - - } - - public static class HttpClientExceptionGenerator { - public enum ERROR{ IOException, SocketException, UnknownHostException, NoRouteToHostException,NoHttpResponseException, HttpHostConnectException, URISyntaxException}; - - Exception exceptionClass=null; - - public HttpClientExceptionGenerator(String exc) { - - for( ERROR e : ERROR.values()) { - if ( exc != null && e.name().equals(exc)) { - switch(e) { - case IOException: - exceptionClass = new IOException("Simulated IOException"); - break; - case URISyntaxException: - exceptionClass = new URISyntaxException("", "Simulated URISyntaxException"); - break; - case NoRouteToHostException: - exceptionClass = new NoRouteToHostException("Simulated NoRouteToHostException"); - break; - case NoHttpResponseException: - exceptionClass = new NoHttpResponseException("Simulated NoHttpResponseException"); - break; - case SocketException: - exceptionClass = new SocketException("Simulated SocketException"); - break; - case UnknownHostException: - exceptionClass = new UnknownHostException("Simulated UnknownHostException"); - break; - - default: - - - } - if ( exceptionClass != null ) { - break; - } - } - } - } - public void throwSimulatedException() throws IOException, URISyntaxException { - if ( exceptionClass != null ) { - if ( exceptionClass instanceof IOException ) { - throw (IOException)exceptionClass; - } else if ( exceptionClass instanceof URISyntaxException ) { - throw (URISyntaxException)exceptionClass; - } - - } - } - - - } + private Logger logger = UIMAFramework.getLogger(HttpServiceTransport.class); + + private HttpClient httpClient = null; + + private PoolingHttpClientConnectionManager cMgr = null; + + private int clientMaxConnections = 1; + + private int clientMaxConnectionsPerRoute = 1; + + private int clientMaxConnectionsPerHostPort = 0; + + private ReentrantLock registryLookupLock = new ReentrantLock(); + + private long threadSleepTime = 1000; // millis + + private final String nodeIP; + + private final String nodeName; + + private final String pid; + + private ITargetURI currentTargetUrl = new NoOpTargetURI(); + + private static final String NA = "N/A"; + + private TransportStats stats = new TransportStats(); + + private IRegistryClient registryClient; + + // holds reference to HttpPost object for every thread. Key=thread id + private Map<Long, HttpPost> httpPostMap = new HashMap<>(); + + private volatile boolean stopping = false; + + private volatile boolean running = false; + + private volatile boolean log = true; + + private AtomicLong xstreamTime = new AtomicLong(); + // private ThreadLocal<HashMap<Long, XStream>> localXStream = new ThreadLocal<HashMap<Long, + // XStream>>() { + // @Override + // protected HashMap<Long, XStream> initialValue() { + // return new HashMap<Long, XStream>(); + // } + // }; + + public HttpServiceTransport(IRegistryClient registryClient, int scaleout) + throws ServiceException { + this.registryClient = registryClient; + clientMaxConnections = scaleout; + + try { + nodeIP = InetAddress.getLocalHost().getHostAddress(); + nodeName = InetAddress.getLocalHost().getCanonicalHostName(); + pid = getProcessIP(NA); + } catch (UnknownHostException e) { + throw new RuntimeException(new TransportException( + "HttpServiceTransport.ctor - Unable to determine Host Name and IP", e)); + } + + } + + private HttpPost getPostMethodForCurrentThread() { + HttpPost postMethod; + if (!httpPostMap.containsKey(Thread.currentThread().getId())) { + // each thread needs its own PostMethod + postMethod = new HttpPost(currentTargetUrl.asString()); + httpPostMap.put(Thread.currentThread().getId(), postMethod); + } else { + postMethod = httpPostMap.get(Thread.currentThread().getId()); + } + return postMethod; + } + + private String getProcessIP(final String fallback) { + // the following code returns '<pid>@<hostname>' + String name = ManagementFactory.getRuntimeMXBean().getName(); + int pos = name.indexOf('@'); + + if (pos < 1) { + // pid not found + return fallback; + } + + try { + return Long.toString(Long.parseLong(name.substring(0, pos))); + } catch (NumberFormatException e) { + // ignore + } + return fallback; + } + + private void lookupNewTarget() { + registryLookupLock.lock(); + while (!stopping) { + try { + String newTarget = registryClient.lookUp(currentTargetUrl.asString()); + currentTargetUrl = TargetURIFactory.newTarget(newTarget); + break; + } catch (Exception e) { + synchronized (httpClient) { + + try { + httpClient.wait(threadSleepTime); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + if (registryLookupLock.isHeldByCurrentThread()) { + registryLookupLock.unlock(); + } + } + + public void addRequestorInfo(IMetaTaskTransaction transaction) { + transaction.setRequesterAddress(nodeIP); + transaction.setRequesterNodeName(nodeName); + transaction.setRequesterProcessId(Integer.valueOf(pid)); + transaction.setRequesterThreadId((int) Thread.currentThread().getId()); + if (logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, "ip:" + transaction.getRequesterAddress()); + logger.log(Level.FINE, "nodeName:" + transaction.getRequesterNodeName()); + logger.log(Level.FINE, "processName:" + transaction.getRequesterProcessName()); + logger.log(Level.FINE, "processId:" + transaction.getRequesterProcessId()); + logger.log(Level.FINE, "threadId:" + transaction.getRequesterThreadId()); + + } + + } + + public void initialize() throws ServiceInitializationException { + + // use plugged in registry to lookup target to connect to. + // Sets global: currentTarget + lookupNewTarget(); + + cMgr = new PoolingHttpClientConnectionManager(); + + if (clientMaxConnections > 0) { + cMgr.setMaxTotal(clientMaxConnections); + } + // Set default max connections per route + if (clientMaxConnectionsPerRoute > 0) { + cMgr.setDefaultMaxPerRoute(clientMaxConnectionsPerRoute); + } + HttpHost httpHost = new HttpHost(currentTargetUrl.asString(), + Integer.valueOf(currentTargetUrl.getPort()), currentTargetUrl.getContext()); + if (clientMaxConnectionsPerHostPort > 0) { + cMgr.setMaxPerRoute(new HttpRoute(httpHost), clientMaxConnectionsPerHostPort); + } + + httpClient = HttpClients.custom().setConnectionManager(cMgr).build(); + running = true; + + } + + private void addCommonHeaders(HttpPost method) { + // synchronized( HttpServiceTransport.class ) { + + method.setHeader("IP", nodeIP); + method.setHeader("Hostname", nodeName); + method.setHeader("ThreadID", String.valueOf(Thread.currentThread().getId())); + method.setHeader("PID", pid); + + // } + + } + + private HttpEntity wrapRequest(String serializedRequest) { + return new StringEntity(serializedRequest, ContentType.APPLICATION_XML); + } + + private boolean isRunning() { + return running; + } + + private IMetaTaskTransaction retryUntilSuccessfull(String request, HttpPost postMethod, + ThreadLocal<HashMap<Long, XStream>> localXStream) { + IMetaTaskTransaction response = null; + + // retry until service is stopped + while (isRunning()) { + try { + response = doPost(postMethod, localXStream); + break; + + } catch (TransportException | IOException | URISyntaxException exx) { + try { + Thread.sleep(threadSleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + lookupNewTarget(); + + } + return response; + + } + + private IMetaTaskTransaction doPost(HttpPost postMethod, + ThreadLocal<HashMap<Long, XStream>> localXStream) + throws URISyntaxException, IOException, TransportException { + postMethod.setURI(new URI(currentTargetUrl.asString())); + + IMetaTaskTransaction metaTransaction = null; + HttpResponse response = httpClient.execute(postMethod); + if (stopping) { + throw new TransportException("Service stopping - rejecting request"); + } + HttpEntity entity = response.getEntity(); + String serializedResponse = EntityUtils.toString(entity); + Object transaction = null; + try { + long t1 = System.currentTimeMillis(); + // transaction = XStreamUtils.unmarshall(serializedResponse); + transaction = localXStream.get().get(Thread.currentThread().getId()) + .fromXML(serializedResponse); + xstreamTime.addAndGet((System.currentTimeMillis() - t1)); + } catch (Exception e) { + logger.log(Level.WARNING, "Process Thread:" + Thread.currentThread().getId() + + " Error while deserializing response with XStream", e); + throw new TransportException(e); + } + if (Objects.isNull(transaction)) { + throw new InvalidClassException("Expected IMetaTaskTransaction - Instead Received NULL"); + + } else if (!(transaction instanceof IMetaTaskTransaction)) { + throw new InvalidClassException("Expected IMetaTaskTransaction - Instead Received " + + transaction.getClass().getName()); + } + metaTransaction = (IMetaTaskTransaction) transaction; + + StatusLine statusLine = response.getStatusLine(); + if (statusLine.getStatusCode() != 200) { + // all IOExceptions are retried + throw new IOException("Unexpected HttpClient response status:" + statusLine + + " Content causing error:" + serializedResponse); + } + + stats.incrementSuccessCount(); + return metaTransaction; + } + + /** + * Dispatches request to remote driver via doPost(). Its synchronized to prevent over-running the + * driver with requests from multiple threads. When the transport fails sending GET/ACK/END a + * single thread will try to recover connection and send the request. + * + */ + @Override + public IMetaTaskTransaction dispatch(String serializedRequest, + ThreadLocal<HashMap<Long, XStream>> localXStream) throws TransportException { + if (stopping) { + throw new IllegalStateException( + "Service transport has been stopped, unable to dispatch request"); + } + IMetaTaskTransaction transaction = null; + HttpEntity e = wrapRequest(serializedRequest); + // Each thread has its own HttpPost method. If current thread + // doesnt have one, it will be created and added to the local + // Map. Subsequent requests will fetch it from the map using + // current thread ID as a key. + HttpPost postMethod = getPostMethodForCurrentThread(); + addCommonHeaders(postMethod); + postMethod.setEntity(e); + try { + String simulatedException; + // To test transport errors add command line option + // -DMockHttpPostError=exception where + // exception is one of the following Strings: + // + // IOException, + // SocketException, + // UnknownHostException, + // NoRouteToHostException, + // NoHttpResponseException, + // HttpHostConnectException, + // URISyntaxException + // Use JUnit test JunitTransoirtTestCase to test the above errors + + if ((simulatedException = System.getProperty("MockHttpPostError")) != null) { + HttpClientExceptionGenerator mockExceptionGenerator = new HttpClientExceptionGenerator( + simulatedException); + mockExceptionGenerator.throwSimulatedException(); + } else { + transaction = doPost(postMethod, localXStream); + } + } catch (IOException | URISyntaxException ex) { + if (stopping) { + // looks like the process is in the shutdown mode. Log an exception and dont + // retry + logger.log(Level.INFO, "Process Thread:" + Thread.currentThread().getId() + + " - Process is already stopping - Caught Exception while calling doPost() \n" + + ex); + throw new TransportException(ex); + } else { + if (log) { + log = false; + stats.incrementErrorCount(); + logger.log(Level.WARNING, + this.getClass().getName() + ".dispatch() >>>>>>>>>> Handling Exception \n" + ex); + logger.log(Level.INFO, + ">>>>>>>>>> Unable to communicate with target:" + currentTargetUrl.asString() + + " - retrying until successfull - with " + threadSleepTime / 1000 + + " seconds wait between retries "); + } + transaction = retryUntilSuccessfull(serializedRequest, postMethod, localXStream); + log = true; + logger.log(Level.INFO, "Established connection to target:" + currentTargetUrl.asString()); + } + + } finally { + postMethod.releaseConnection(); + } + return transaction; + + } + + public void stop(boolean quiesce) { + + stopping = true; + running = false; + // Use System.out since the logger's ShutdownHook may have closed streams + System.out.println(Utils.getTimestamp() + ">>>>>>> " + Utils.getShortClassname(this.getClass()) + + " stop() called - mode:" + (quiesce == true ? "quiesce" : "stop")); + logger.log(Level.INFO, this.getClass().getName() + " stop() called"); + System.out.println(" ########################################3 Total time in XStream:" + + (xstreamTime.get() / 1000) + " secs"); + if (!quiesce && cMgr != null) { + cMgr.shutdown(); + System.out.println(Utils.getTimestamp() + ">>>>>>> " + + Utils.getShortClassname(this.getClass()) + " stopped connection mgr"); + logger.log(Level.INFO, this.getClass().getName() + " stopped connection mgr"); + + } + } + + public static void main(String[] args) { + + } + + public static class HttpClientExceptionGenerator { + public enum ERROR { + IOException, SocketException, UnknownHostException, NoRouteToHostException, NoHttpResponseException, HttpHostConnectException, URISyntaxException + }; + + Exception exceptionClass = null; + + public HttpClientExceptionGenerator(String exc) { + + for (ERROR e : ERROR.values()) { + if (exc != null && e.name().equals(exc)) { + switch (e) { + case IOException: + exceptionClass = new IOException("Simulated IOException"); + break; + case URISyntaxException: + exceptionClass = new URISyntaxException("", "Simulated URISyntaxException"); + break; + case NoRouteToHostException: + exceptionClass = new NoRouteToHostException("Simulated NoRouteToHostException"); + break; + case NoHttpResponseException: + exceptionClass = new NoHttpResponseException("Simulated NoHttpResponseException"); + break; + case SocketException: + exceptionClass = new SocketException("Simulated SocketException"); + break; + case UnknownHostException: + exceptionClass = new UnknownHostException("Simulated UnknownHostException"); + break; + + default: + + } + if (exceptionClass != null) { + break; + } + } + } + } + + public void throwSimulatedException() throws IOException, URISyntaxException { + if (exceptionClass != null) { + if (exceptionClass instanceof IOException) { + throw (IOException) exceptionClass; + } else if (exceptionClass instanceof URISyntaxException) { + throw (URISyntaxException) exceptionClass; + } + + } + } + + } }
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java?rev=1858488&r1=1858487&r2=1858488&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java Wed May 1 13:42:39 2019 @@ -21,6 +21,12 @@ package org.apache.uima.ducc.ps; import java.io.BufferedReader; import java.io.IOException; import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -34,6 +40,7 @@ import org.apache.uima.cas.CAS; import org.apache.uima.ducc.ps.net.iface.IMetaTask; import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction; import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Direction; +import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type; import org.apache.uima.ducc.ps.net.impl.MetaTask; import org.apache.uima.ducc.ps.service.transport.XStreamUtils; import org.apache.uima.ducc.ps.service.utils.UimaSerializer; @@ -48,243 +55,441 @@ import org.eclipse.jetty.servlet.Servlet import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.After; +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.io.xml.DomDriver; + public class Client { - private Server server; - private boolean block = false; - private AtomicLong errorCount = new AtomicLong(); - private final static String app="test"; - private int httpPort = 12222; - private int maxThreads = 50; - private static UimaSerializer uimaSerializer = new UimaSerializer(); - private AtomicInteger correlationIdCounter = - new AtomicInteger(0); - private AtomicInteger atomicCounter = - new AtomicInteger(1); - private AtomicInteger atomicErrorCounter = - new AtomicInteger(10); - private volatile boolean noMoreErrors = false; - protected String getApp() { - return app; - } - protected int getJettyPort() { - while(true) { - ServerSocket socket=null; - try { - socket = new ServerSocket(httpPort); - break; - } catch( IOException e) { - httpPort++; - } finally { - if ( socket != null ) { - try { - socket.close(); - } catch( Exception ee) {} - - } - } - } - return httpPort; - } - protected int getPort() { - - return httpPort; - } - - public void startJetty(boolean block) throws Exception { - this.block = block; - - - QueuedThreadPool threadPool = new QueuedThreadPool(); - if (maxThreads < threadPool.getMinThreads()) { - System.out.println( - "Invalid value for jetty MaxThreads("+maxThreads+") - it should be greater or equal to "+threadPool.getMinThreads()+". Defaulting to jettyMaxThreads="+threadPool.getMaxThreads()); - threadPool.setMaxThreads(threadPool.getMinThreads()); - } else { - threadPool.setMaxThreads(maxThreads); - } - - server = new Server(threadPool); - - // Server connector - ServerConnector connector = new ServerConnector(server); - connector.setPort(getJettyPort()); - server.setConnectors(new Connector[] { connector }); - System.out.println("launching Jetty on Port:"+connector.getPort()); - ServletContextHandler context = new ServletContextHandler( - ServletContextHandler.SESSIONS); - context.setContextPath("/"); - server.setHandler(context); - - context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/"+app); - - - server.start(); - System.out.println("Jetty Started - Waiting for Messages ..."); - } - - @After - public void stopJetty() - { - try - { - if ( server != null ) { - UIMAFramework.getLogger().log(Level.INFO, "Stopping Jetty"); - server.stop(); - - } - } - catch (Exception e) - { - e.printStackTrace(); - } - UIMAFramework.getLogger().log(Level.INFO,"Jetty Stopped"); - } - public class TaskHandlerServlet extends HttpServlet { - private static final long serialVersionUID = 1L; - - public TaskHandlerServlet() { - } - - protected void doPost(HttpServletRequest request, - HttpServletResponse response) throws ServletException, - IOException { - try { - //System.out.println("Handling HTTP Post Request"); - //long post_stime = System.nanoTime(); - StringBuilder sb = new StringBuilder(); - BufferedReader reader = request.getReader(); - String line; - while ((line = reader.readLine()) != null) { - sb.append(line); - } - String content = sb.toString().trim(); - - //System.out.println( "Http Request Body:::"+String.valueOf(content)); - - - String nodeIP = request.getHeader("IP"); - String nodeName = request.getHeader("Hostname"); - String threadID = request.getHeader("ThreadID"); - String pid = request.getHeader("PID"); - System.out.println( "Sender ID:::Node IP"+nodeIP+" Node Name:"+nodeName+" PID:"+pid+" ThreadID:"+threadID); - - IMetaTaskTransaction imt = null; - - imt = (IMetaTaskTransaction) XStreamUtils.unmarshall(content); - IMetaTaskTransaction.Type type = imt.getType(); - switch(type) { - case Get: - System.out.println("---- Driver handling GET Request -- Thread:"+Thread.currentThread().getId()); - imt.setMetaTask(getMetaMetaCas()); - imt.getMetaTask().setAppData("CorrelationID-"+correlationIdCounter.incrementAndGet()); - if ( System.getProperty("simulate.no.work") == null || noMoreErrors) { - imt.getMetaTask().setUserSpaceTask(getSerializedCAS()); - } else { - System.out.println("---- Driver handling GET Request -- Client Out of Tasks -- Thread:"+Thread.currentThread().getId()); - imt.getMetaTask().setUserSpaceTask(null); - if ( atomicErrorCounter.decrementAndGet() == 0 ) { - noMoreErrors = true; - } - } - // handleMetaCasTransationGet(trans, taskConsumer); - break; - case Ack: - System.out.println("---- Driver handling ACK Request - "); - //handleMetaCasTransationAck(trans, taskConsumer); - break; - case End: - System.out.println("---- Driver handling END Request - "+imt.getMetaTask().getAppData()); - //handleMetaCasTransationEnd(trans, taskConsumer); - if ( imt.getMetaTask().getUserSpaceException() != null ) { - System.out.println("Client received error#"+errorCount.incrementAndGet()); - } - break; - case InvestmentReset: - // handleMetaCasTransationInvestmentReset(trans, rwt); - break; - default: - break; - } - // process service request - //taskProtocolHandler.handle(imt); - - //long marshall_stime = System.nanoTime(); - // setup reply - - imt.setDirection(Direction.Response); - - response.setStatus(HttpServletResponse.SC_OK); - - response.setHeader("content-type", "text/xml"); - String body = XStreamUtils.marshall(imt); - - if (block ) { - synchronized(this) { - this.wait(0); - } - - } - System.out.println("Sending response"); - response.getWriter().write(body); - - - //response.getWriter().write(content); - } catch( InterruptedException e) { - Thread.currentThread().interrupt(); - } - catch (Throwable e) { - e.printStackTrace(); - throw new ServletException(e); - } - } - - } - public long getErrorCount() { - return errorCount.get(); - } - private IMetaTask getMetaCas(String serializedCas) { - if ( serializedCas == null ) { - return null; - } - return new MetaTask(atomicCounter.incrementAndGet(), "", serializedCas); - } - - private IMetaTask getMetaMetaCas() { - //IMetaMetaCas mmc = new MetaMetaCas(); - - String serializedCas = "Bogus"; - - IMetaTask metaCas = getMetaCas(serializedCas); - - // mmc.setMetaCas(metaCas); - //return mmc; - return metaCas; - } - public String getSerializedCAS() { - //logger.log(Level.INFO,"getSerializedCAS() Call "+seqno.incrementAndGet() - // + " - from "+taskConsumer.getType()+":"+taskConsumer.getHostName()+"-"+taskConsumer.getPid()+"-"+taskConsumer.getThreadId() ); - String serializedCas = null; - try { - CAS cas = null; - cas = CasCreationUtils.createCas(new TypeSystemDescription_impl(), null, null); - cas.setDocumentLanguage("en"); - - //logger.log(Level.INFO,"delivering: " + text); - cas.setDocumentText("TEST"); -// cas.setDocumentText("100 "+seqno.incrementAndGet()+" 1000 0"); - - serializedCas = serialize(cas); - cas.reset(); - cas.release(); - - } catch( Exception e) { - //logger.log(Level.WARNING,"Error",e); - } - - return serializedCas; - } - private String serialize(CAS cas) throws Exception { - String serializedCas = uimaSerializer.serializeCasToXmi(cas); - return serializedCas; - } + private Server server; + + private boolean block = false; + + private AtomicLong errorCount = new AtomicLong(); + + private AtomicLong taskCount = new AtomicLong(); + + private final static String app = "test"; + + private int httpPort = 12222; + + private int maxThreads = 50; + + private volatile boolean print = true; + + private static UimaSerializer uimaSerializer = new UimaSerializer(); + + private AtomicInteger correlationIdCounter = new AtomicInteger(0); + + private AtomicInteger atomicCounter = new AtomicInteger(1); + + private AtomicInteger atomicErrorCounter = new AtomicInteger(16); + + private volatile boolean noMoreErrors = false; + + Map<String, List<ThreadMetrics>> metrics = new ConcurrentHashMap<>(); + + private AtomicLong idleTime = new AtomicLong(); + + private AtomicLong lastTime = new AtomicLong(); + + private AtomicLong xstreamTime = new AtomicLong(); + + private ThreadLocal<HashMap<Long, XStream>> localXStream = new ThreadLocal<HashMap<Long, XStream>>() { + @Override + protected HashMap<Long, XStream> initialValue() { + return new HashMap<Long, XStream>(); + } + }; + + protected String getApp() { + return app; + } + + protected int getJettyPort() { + while (true) { + ServerSocket socket = null; + try { + socket = new ServerSocket(httpPort); + break; + } catch (IOException e) { + httpPort++; + } finally { + if (socket != null) { + try { + socket.close(); + } catch (Exception ee) { + } + + } + } + } + return httpPort; + } + + protected int getPort() { + + return httpPort; + } + + public void startJetty(boolean block) throws Exception { + this.block = block; + + QueuedThreadPool threadPool = new QueuedThreadPool(); + if (maxThreads < threadPool.getMinThreads()) { + System.out.println("Invalid value for jetty MaxThreads(" + maxThreads + + ") - it should be greater or equal to " + threadPool.getMinThreads() + + ". Defaulting to jettyMaxThreads=" + threadPool.getMaxThreads()); + threadPool.setMaxThreads(threadPool.getMinThreads()); + } else { + threadPool.setMaxThreads(maxThreads); + } + + server = new Server(threadPool); + + // Server connector + ServerConnector connector = new ServerConnector(server); + System.out.println(">>>> Jetty Acceptors:" + connector.getAcceptors()); + + connector.setPort(getJettyPort()); + server.setConnectors(new Connector[] { connector }); + System.out.println("launching Jetty on Port:" + connector.getPort()); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + server.setHandler(context); + + context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/" + app); + + server.start(); + System.out.println("Jetty Started - Waiting for Messages ..."); + } + + @After + public void stopJetty() { + try { + if (server != null) { + UIMAFramework.getLogger().log(Level.INFO, "Stopping Jetty"); + server.stop(); + + } + // System.out.println(">>>>>>>>>>>>>>> IDLE TIME:"+(idleTime.longValue()/1000)); + + if (print) { + print = false; + + for (Entry<String, List<ThreadMetrics>> me : metrics.entrySet()) { + StringBuilder sb = new StringBuilder(); + sb.append("Service Thread Id:").append(me.getKey()).append(" Number of Tasks Processed:") + .append(me.getValue().size()); + int i = 0; + for (ThreadMetrics tm : me.getValue()) { + long analysisTime = 0; + try { + analysisTime = Long.parseLong(tm.getAnalysisTime()); + } catch (Exception e) { + } + ThreadMetrics previous = null; + if (i > 0) { + previous = me.getValue().get(i - 1); + } + sb.append("\n\tTask ").append(tm.getCorrelationId()).// append(" Ack Time:"). + append(tm.getAckTime() - tm.getGetTime()).append(" Get-Ack-End:") + .append(tm.getEndTime() - tm.getGetTime()).append(" ms") + .append(" Analysis Time:").append(tm.getAnalysisTime()).append(" Overhead:") + .append((tm.getEndTime() - tm.getGetTime()) - analysisTime); + if (previous != null) { + sb.append(" Idle Time:").append(tm.getGetTime() - previous.getEndTime()); + } + i++; + + } + + System.out.println(sb.toString()); + System.out.println(">>>>> Total Tasks Processed:" + taskCount + " Client Time in xstream:" + + (xstreamTime.get() / 1000)); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + UIMAFramework.getLogger().log(Level.INFO, "Jetty Stopped"); + } + + public class TaskHandlerServlet extends HttpServlet { + private static final long serialVersionUID = 1L; + + public TaskHandlerServlet() { + } + + protected void doPost(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + try { + // System.out.println("Handling HTTP Post Request"); + // long post_stime = System.nanoTime(); + StringBuilder sb = new StringBuilder(); + BufferedReader reader = request.getReader(); + String line; + while ((line = reader.readLine()) != null) { + sb.append(line); + } + String content = sb.toString().trim(); + + // System.out.println( "Http Request Body:::"+String.valueOf(content)); + + if (localXStream.get().get(Thread.currentThread().getId()) == null) { + localXStream.get().put(Thread.currentThread().getId(), XStreamUtils.getXStreamInstance());// new + // XStream(new + // DomDriver())); + } + + String nodeIP = request.getHeader("IP"); + String nodeName = request.getHeader("Hostname"); + String threadID = request.getHeader("ThreadID"); + String pid = request.getHeader("PID"); + // System.out.println( "Sender ID:::Node IP"+nodeIP+" Node Name:"+nodeName+" PID:"+pid+" + // ThreadID:"+threadID); + + IMetaTaskTransaction imt = null; + long t1 = System.currentTimeMillis(); + // imt = (IMetaTaskTransaction) XStreamUtils.unmarshall(content); + imt = (IMetaTaskTransaction) localXStream.get().get(Thread.currentThread().getId()) + .fromXML(content); + xstreamTime.addAndGet(System.currentTimeMillis() - t1); + IMetaTaskTransaction.Type type = imt.getType(); + switch (type) { + case Get: + // idleTime.addAndGet(System.currentTimeMillis() - lastTime.longValue()); + // lastTime.set(System.currentTimeMillis()); + + taskCount.incrementAndGet(); + // System.out.println("---- Driver handling GET Request -- Thread:"+threadID); + List<ThreadMetrics> tmList = null; + if (metrics.containsKey(threadID)) { + tmList = metrics.get(threadID); + } else { + tmList = new ArrayList<>(); + metrics.put(threadID, tmList); + + } + ThreadMetrics tm = new ThreadMetrics(); + tm.setGetTime(System.currentTimeMillis()); + tmList.add(tm); + + imt.setMetaTask(getMetaMetaCas()); + + imt.getMetaTask().setAppData("CorrelationID-" + correlationIdCounter.incrementAndGet()); + tm.setCorrelationId(imt.getMetaTask().getAppData()); + if (System.getProperty("simulate.no.work") == null || noMoreErrors) { + imt.getMetaTask().setUserSpaceTask(getSerializedCAS()); + } else { + System.out + .println("---- Driver handling GET Request -- Client Out of Tasks -- Thread:" + + threadID); + imt.getMetaTask().setUserSpaceTask(null); + if (atomicErrorCounter.decrementAndGet() == 0) { + noMoreErrors = true; + } + } + // handleMetaCasTransationGet(trans, taskConsumer); + break; + case Ack: + // System.out.println("---- Driver handling ACK Request - "); + List<ThreadMetrics> tmList2 = metrics.get(threadID); + for (ThreadMetrics tm2 : tmList2) { + if (imt.getMetaTask().getAppData().equals(tm2.getCorrelationId())) { + tm2.setAckTime(System.currentTimeMillis()); + break; + } + } + + // handleMetaCasTransationAck(trans, taskConsumer); + break; + case End: + // System.out.println("---- Driver handling END Request - + // "+imt.getMetaTask().getAppData()); + List<ThreadMetrics> tmList3 = metrics.get(threadID); + for (ThreadMetrics tm3 : tmList3) { + if (imt.getMetaTask().getAppData().equals(tm3.getCorrelationId())) { + tm3.setEndTime(System.currentTimeMillis()); + if (imt.getMetaTask().getPerformanceMetrics() != null) { + String metrics = imt.getMetaTask().getPerformanceMetrics(); + int start = metrics.indexOf("<analysisTime>") + "<analysisTime>".length(); + int end = metrics.indexOf("</analysisTime>"); + String analysisTime = metrics.substring(start, end); + // System.out.println(">>>>>>>>>>>>>>>>> Analysis Time:"+analysisTime); + tm3.setAnalysisTime(analysisTime); + } + break; + } + } + // handleMetaCasTransationEnd(trans, taskConsumer); + if (imt.getMetaTask().getUserSpaceException() != null) { + System.out.println("Client received error#" + errorCount.incrementAndGet()); + } + + break; + case InvestmentReset: + // handleMetaCasTransationInvestmentReset(trans, rwt); + break; + default: + break; + } + // process service request + // taskProtocolHandler.handle(imt); + + // long marshall_stime = System.nanoTime(); + // setup reply + + imt.setDirection(Direction.Response); + + response.setStatus(HttpServletResponse.SC_OK); + + response.setHeader("content-type", "text/xml"); + // String body = XStreamUtils.marshall(imt); + String body = localXStream.get().get(Thread.currentThread().getId()).toXML(imt); + if (block) { + synchronized (this) { + this.wait(0); + } + + } + + // System.out.println("Sending response"); + response.getWriter().write(body); + + // response.getWriter().write(content); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Throwable e) { + e.printStackTrace(); + throw new ServletException(e); + } + } + + } + + public long getErrorCount() { + return errorCount.get(); + } + + private IMetaTask getMetaCas(String serializedCas) { + if (serializedCas == null) { + return null; + } + return new MetaTask(atomicCounter.incrementAndGet(), "", serializedCas); + } + + private IMetaTask getMetaMetaCas() { + // IMetaMetaCas mmc = new MetaMetaCas(); + + String serializedCas = "Bogus"; + + IMetaTask metaCas = getMetaCas(serializedCas); + + // mmc.setMetaCas(metaCas); + // return mmc; + return metaCas; + } + + public String getSerializedCAS() { + // logger.log(Level.INFO,"getSerializedCAS() Call "+seqno.incrementAndGet() + // + " - from + // "+taskConsumer.getType()+":"+taskConsumer.getHostName()+"-"+taskConsumer.getPid()+"-"+taskConsumer.getThreadId() + // ); + String serializedCas = null; + try { + CAS cas = null; + cas = CasCreationUtils.createCas(new TypeSystemDescription_impl(), null, null); + cas.setDocumentLanguage("en"); + + // logger.log(Level.INFO,"delivering: " + text); + cas.setDocumentText("TEST"); + // cas.setDocumentText("100 "+seqno.incrementAndGet()+" 1000 0"); + + serializedCas = serialize(cas); + cas.reset(); + cas.release(); + + } catch (Exception e) { + // logger.log(Level.WARNING,"Error",e); + } + + return serializedCas; + } + + private String serialize(CAS cas) throws Exception { + String serializedCas = uimaSerializer.serializeCasToXmi(cas); + return serializedCas; + } + + private class ThreadMetrics { + long getTime; + + long ackTime; + + long endTime; + + long idleTime; + + long lastTime; + + String correlationId; + + String analysisTime; + + public long getLastTime() { + return lastTime; + } + + public void setLastTime(long lastTime) { + this.lastTime = lastTime; + } + + public String getAnalysisTime() { + return analysisTime; + } + + public void setAnalysisTime(String analysisTime) { + this.analysisTime = analysisTime; + } + + public long getIdleTime() { + return idleTime; + } + + public void setIdleTime(long idleTime) { + this.idleTime = idleTime; + } + + public String getCorrelationId() { + return correlationId; + } + + public void setCorrelationId(String correlationId) { + this.correlationId = correlationId; + } + + public long getGetTime() { + return getTime; + } + + public void setGetTime(long getTime) { + this.getTime = getTime; + } + + public long getAckTime() { + return ackTime; + } + + public void setAckTime(long ackTime) { + this.ackTime = ackTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + } } Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java Wed May 1 13:42:39 2019 @@ -21,6 +21,7 @@ package org.apache.uima.ducc.ps.service; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.uima.ducc.ps.Client; import org.apache.uima.ducc.ps.service.builders.PullServiceStepBuilder; @@ -30,269 +31,264 @@ import org.apache.uima.ducc.ps.service.p import org.junit.Test; public class JunitPullServiceTestCase extends Client { - private static final long DELAY=5000; - CountDownLatch threadsReady; - CountDownLatch stopLatch; - { - // static initializer sets amount of time the service delays - // sending READY to a monitor - System.setProperty("ducc.service.init.delay", "3000"); - } - @Test - public void testPullService() throws Exception { - System.out.println("----------------- testPullService -------------------"); - int scaleout = 2; - super.startJetty(false); // don't block - String analysisEngineDescriptor = "TestAAE"; - System.setProperty("ducc.deploy.JpType", "uima"); - - IServiceProcessor processor = new - UimaServiceProcessor(analysisEngineDescriptor); - - String tasURL = "http://localhost:"+super.getPort()+"/test"; - - IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) - .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) - .withOptionalsDone().build(); - - try { - service.initialize(); - Timer fTimer = new Timer("testPullService Timer"); - // after 5secs stop the pull service - fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY); - - service.start(); - - } catch (ServiceInitializationException e) { - throw e; - } catch (Exception e) { - throw e; - } finally { - super.stopJetty(); - } - } - @Test - public void testPullServiceQuiesce() throws Exception { - System.out.println("----------------- testPullServiceQuiesce -------------------"); - int scaleout = 2; - super.startJetty(false); // don't block - String analysisEngineDescriptor = "TestAAE"; - System.setProperty("ducc.deploy.JpType", "uima"); - IServiceProcessor processor = new - UimaServiceProcessor(analysisEngineDescriptor); - - String tasURL = "http://localhost:"+super.getPort()+"/test"; - - IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) - .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) - .withOptionalsDone().build(); - - try { - service.initialize(); - Timer fTimer = new Timer("testPullService Timer"); - // after 5secs stop the pull service - fTimer.schedule(new MyTimerTask(service, fTimer, true), DELAY); - - service.start(); - - } catch (ServiceInitializationException e) { - throw e; - } catch (Exception e) { - throw e; - }finally { - super.stopJetty(); - } - } - - @Test - public void testPullServiceTimeout() throws Exception { - System.out.println("----------------- testPullServiceTimeout -------------------"); - super.startJetty(true); // true=client blocks all POST requests - int scaleout = 12; - String analysisEngineDescriptor = "TestAAE"; - IServiceProcessor processor = new - UimaServiceProcessor(analysisEngineDescriptor); - - String tasURL ="http://localhost:"+super.getPort()+"/test"; - - IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) - .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) - .withOptionalsDone().build(); - - try { - service.initialize(); - System.out.println("----------- Starting Service ....."); - Timer fTimer = new Timer(); - //after 10sec stop the service - fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY); - - service.start(); - - - } catch (ServiceInitializationException e) { - throw e; - } catch (Exception e) { - throw e; - }finally { - super.stopJetty(); - } - } - - @Test - public void testStopOnFirstError() throws Exception { - System.out.println("----------------- testStopOnFirstError -------------------"); - int scaleout = 10; - super.startJetty(false); // don't block - String analysisEngineDescriptor = "NoOpAE"; - System.setProperty("ducc.deploy.JpType", "uima"); - - IServiceProcessor processor = - new UimaServiceProcessor(analysisEngineDescriptor); - // fail on 1st error - processor.setErrorHandlerWindow(1, 5); - - String tasURL = "http://localhost:"+super.getPort()+"/test"; - - IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) - .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) - .withOptionalsDone().build(); - - try { - System.setProperty("ProcessFail","2"); - service.initialize(); - - service.start(); - - } catch (ServiceInitializationException e) { - throw e; - } catch (Exception e) { - throw e; - } finally { - System.getProperties().remove("ProcessFail"); - super.stopJetty(); - } - } - @Test - public void testTerminateOn2ErrorsInWindowOf5() throws Exception { - System.out.println("----------------- testTerminateOn2ErrorsInWindowOf5 -------------------"); - int scaleout = 10; - super.startJetty(false); // don't block - String analysisEngineDescriptor = "NoOpAE"; - System.setProperty("ducc.deploy.JpType", "uima"); - - IServiceProcessor processor = - new UimaServiceProcessor(analysisEngineDescriptor); - // fail on 2nd error in a window of 5 - processor.setErrorHandlerWindow(2, 5); - String tasURL = "http://localhost:"+super.getPort()+"/test"; - - IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) - .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) - .withOptionalsDone().build(); - - try { - // fail task#1 and task#3 which should stop the test - System.setProperty("ProcessFail","1,3"); - service.initialize(); - - service.start(); - - } catch (ServiceInitializationException e) { - throw e; - } catch (Exception e) { - throw e; - } finally { - System.getProperties().remove("ProcessFail"); - super.stopJetty(); - } - } - @Test - public void testProcessFailureDefaultErrorHandler() throws Exception { - System.out.println("----------------- testProcessFailureDefaultErrorHandler -------------------"); - int scaleout = 14; - super.startJetty(false); // don't block - String analysisEngineDescriptor = "NoOpAE"; - IServiceProcessor processor = new - UimaServiceProcessor(analysisEngineDescriptor); - - String tasURL = "http://localhost:"+super.getPort()+"/test"; - - IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) - .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) - .withOptionalsDone().build(); - - try { - // fail on 2nd task. This should terminate the test - System.setProperty("ProcessFail","20"); - service.initialize(); - Timer fTimer = new Timer("testPullService Timer"); - // after 5secs stop the pull service - fTimer.schedule(new MyTimerTask(service, fTimer, false), 20000); - - service.start(); - - } catch (ServiceInitializationException e) { - throw e; - } catch (Exception e) { - throw e; - } finally { - System.getProperties().remove("ProcessFail"); - super.stopJetty(); - } - } - - /* - @Test - public void testPullServiceBadClientURL() throws Exception { - int scaleout = 2; - super.startJetty(false); // don't block - String analysisEngineDescriptor = "TestAAE"; - IServiceProcessor processor = new - UimaServiceProcessor(analysisEngineDescriptor); - - String tasURL ="http://localhost2:8080/test"; - - IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) - .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) - .withOptionalsDone().build(); - - try { - service.initialize(); - service.start(); - - - } catch (ServiceInitializationException e) { - throw e; - } catch (Exception e) { - throw e; - } - } - */ - class MyTimerTask extends TimerTask { - final IService service; - final Timer fTimer; - final boolean quiesce; - - MyTimerTask(IService service, Timer fTimer, boolean quiesce) { - this.service = service; - this.fTimer = fTimer; - this.quiesce = quiesce; - } - - @Override - - public void run() { - this.cancel(); - fTimer.purge(); - fTimer.cancel(); - System.out.println("Timmer popped - stopping service"); - if (quiesce ) { - service.quiesceAndStop(); - } else { - service.stop(); - } - } + private static final long DELAY = 20000; - } + CountDownLatch threadsReady; + + CountDownLatch stopLatch; + { + // static initializer sets amount of time the service delays + // sending READY to a monitor + System.setProperty("ducc.service.init.delay", "3000"); + } + + @Test + public void testPullService() throws Exception { + System.out.println("----------------- testPullService -------------------"); + int scaleout = 20; + super.startJetty(false); // don't block + // String analysisEngineDescriptor = "TestAAE"; + String analysisEngineDescriptor = "NoOpAE"; + System.setProperty("ducc.deploy.JpType", "uima"); + // System.setProperty("simulate.no.work", "3"); UNCOMMENT TO SIMULATE NO WORK + IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor); + + String tasURL = "http://localhost:" + super.getPort() + "/test"; + + IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) + .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) + .withOptionalsDone().build(); + long start = System.currentTimeMillis(); + try { + service.initialize(); + Timer fTimer = new Timer("testPullService Timer"); + // after 5secs stop the pull service + fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY); + + service.start(); + + } catch (ServiceInitializationException e) { + throw e; + } catch (Exception e) { + throw e; + } finally { + long end = System.currentTimeMillis(); + super.stopJetty(); + + System.out.println("###################### start-end clock time:" + (end - start) / 1000); + } + } + + @Test + public void testPullServiceQuiesce() throws Exception { + System.out.println("----------------- testPullServiceQuiesce -------------------"); + int scaleout = 12; + super.startJetty(false); // don't block + String analysisEngineDescriptor = "TestAAE"; + System.setProperty("ducc.deploy.JpType", "uima"); + IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor); + + String tasURL = "http://localhost:" + super.getPort() + "/test"; + + IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) + .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) + .withOptionalsDone().build(); + + try { + service.initialize(); + Timer fTimer = new Timer("testPullService Timer"); + // after 5secs stop the pull service + fTimer.schedule(new MyTimerTask(service, fTimer, true), DELAY); + + service.start(); + + } catch (ServiceInitializationException e) { + throw e; + } catch (Exception e) { + throw e; + } finally { + super.stopJetty(); + } + } + + @Test + public void testPullServiceTimeout() throws Exception { + System.out.println("----------------- testPullServiceTimeout -------------------"); + super.startJetty(true); // true=client blocks all POST requests + int scaleout = 12; + String analysisEngineDescriptor = "TestAAE"; + IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor); + + String tasURL = "http://localhost:" + super.getPort() + "/test"; + + IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) + .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) + .withOptionalsDone().build(); + + try { + service.initialize(); + System.out.println("----------- Starting Service ....."); + Timer fTimer = new Timer(); + // after 10sec stop the service + fTimer.schedule(new MyTimerTask(service, fTimer, false), DELAY); + + service.start(); + + } catch (ServiceInitializationException e) { + throw e; + } catch (Exception e) { + throw e; + } finally { + super.stopJetty(); + } + } + + @Test + public void testStopOnFirstError() throws Exception { + System.out.println("----------------- testStopOnFirstError -------------------"); + int scaleout = 10; + super.startJetty(false); // don't block + String analysisEngineDescriptor = "NoOpAE"; + System.setProperty("ducc.deploy.JpType", "uima"); + + IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor); + // fail on 1st error + processor.setErrorHandlerWindow(1, 5); + + String tasURL = "http://localhost:" + super.getPort() + "/test"; + + IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) + .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) + .withOptionalsDone().build(); + + try { + System.setProperty("ProcessFail", "2"); + service.initialize(); + + service.start(); + + } catch (ServiceInitializationException e) { + throw e; + } catch (Exception e) { + throw e; + } finally { + System.getProperties().remove("ProcessFail"); + super.stopJetty(); + } + } + + @Test + public void testTerminateOn2ErrorsInWindowOf5() throws Exception { + System.out.println("----------------- testTerminateOn2ErrorsInWindowOf5 -------------------"); + int scaleout = 10; + super.startJetty(false); // don't block + String analysisEngineDescriptor = "NoOpAE"; + System.setProperty("ducc.deploy.JpType", "uima"); + + IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor); + // fail on 2nd error in a window of 5 + processor.setErrorHandlerWindow(2, 5); + String tasURL = "http://localhost:" + super.getPort() + "/test"; + + IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) + .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) + .withOptionalsDone().build(); + + try { + // fail task#1 and task#3 which should stop the test + System.setProperty("ProcessFail", "1,3"); + service.initialize(); + + service.start(); + + } catch (ServiceInitializationException e) { + throw e; + } catch (Exception e) { + throw e; + } finally { + System.getProperties().remove("ProcessFail"); + super.stopJetty(); + } + } + + @Test + public void testProcessFailureDefaultErrorHandler() throws Exception { + System.out + .println("----------------- testProcessFailureDefaultErrorHandler -------------------"); + int scaleout = 14; + super.startJetty(false); // don't block + String analysisEngineDescriptor = "NoOpAE"; + IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor); + + String tasURL = "http://localhost:" + super.getPort() + "/test"; + + IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) + .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) + .withOptionalsDone().build(); + + try { + // fail on 2nd task. This should terminate the test + System.setProperty("ProcessFail", "20"); + service.initialize(); + Timer fTimer = new Timer("testPullService Timer"); + // after 5secs stop the pull service + fTimer.schedule(new MyTimerTask(service, fTimer, false), 20000); + + service.start(); + + } catch (ServiceInitializationException e) { + throw e; + } catch (Exception e) { + throw e; + } finally { + System.getProperties().remove("ProcessFail"); + super.stopJetty(); + } + } + + /* + * @Test public void testPullServiceBadClientURL() throws Exception { int scaleout = 2; + * super.startJetty(false); // don't block String analysisEngineDescriptor = "TestAAE"; + * IServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor); + * + * String tasURL ="http://localhost2:8080/test"; + * + * IService service = PullServiceStepBuilder.newBuilder().withProcessor(processor) + * .withClientURL(tasURL).withType("Note Service").withScaleout(scaleout) + * .withOptionalsDone().build(); + * + * try { service.initialize(); service.start(); + * + * + * } catch (ServiceInitializationException e) { throw e; } catch (Exception e) { throw e; } } + */ + class MyTimerTask extends TimerTask { + final IService service; + + final Timer fTimer; + + final boolean quiesce; + + MyTimerTask(IService service, Timer fTimer, boolean quiesce) { + this.service = service; + this.fTimer = fTimer; + this.quiesce = quiesce; + } + + @Override + + public void run() { + this.cancel(); + fTimer.purge(); + fTimer.cancel(); + System.out.println("Timmer popped - stopping service"); + if (quiesce) { + service.quiesceAndStop(); + } else { + service.stop(); + } + } + + } } Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java Wed May 1 13:42:39 2019 @@ -28,189 +28,194 @@ import org.apache.uima.ducc.ps.service.e import org.apache.uima.ducc.ps.service.main.ServiceWrapper; import org.junit.Test; -public class JUnitServiceWrapperTestCase extends Client { - private static final long DELAY=5000; - { - // static initializer sets amount of time the service delays - // sending READY to a monitor - System.setProperty("ducc.service.init.delay", "3000"); - } - - - @Test - public void testPullServiceWrapperNoTask() throws Exception { - // make client return null task in response to GET - System.setProperty("simulate.no.work", "3"); - System.setProperty("ducc.process.thread.sleep.time", "1000"); - try { - testPullServiceWrapper(); - } finally { - System.getProperties().remove("simulate.no.work"); - } - } - @Test - public void testPullServiceWrapper() throws Exception { - System.out.println("-------------------------- testPullServiceWrapper ----------------------");; - - //int scaleout = 2; - StateMonitor monitor = new StateMonitor(); - monitor.start(); - System.out.println("........... Monitor Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT")); - super.startJetty(false); // don't block - String analysisEngineDescriptor = "TestAAE"; - System.setProperty("ducc.deploy.JpType", "uima"); - - String tasURL = "http://localhost:"+super.getPort()+"/test"; - try { - System.setProperty("ducc.deploy.JdURL", tasURL); - System.setProperty("ducc.deploy.JpThreadCount","4"); - System.setProperty("ducc.deploy.service.type", "NotesService"); - System.setProperty("ducc.deploy.JpType", "uima"); - - ServiceWrapper service = new ServiceWrapper(); - - Timer fTimer = new Timer("testPullService Timer"); - // after 5secs stop the pull service - fTimer.schedule(new MyTimerTask(service, fTimer), 40000); - - service.initialize(new String[] {analysisEngineDescriptor}); - - service.start(); - - - } catch (ServiceInitializationException e) { - throw e; - } catch (Exception e) { - throw e; - } finally { - monitor.stop(); - super.stopJetty(); - - } - } - - @Test - public void testPullServiceWrapperWithProcessFailure() throws Exception { - System.out.println("-------------------------- testPullServiceWrapperWithProcessFailure ----------------------");; - //int scaleout = 2; - StateMonitor monitor = new StateMonitor(); - monitor.start(); - System.out.println("........... Monitor Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT")); - super.startJetty(false); // don't block - String analysisEngineDescriptor = "NoOpAE"; - - String tasURL = "http://localhost:"+super.getPort()+"/test"; - try { - // Force process failure of the first task - System.setProperty("ProcessFail","1"); - - System.setProperty("ducc.deploy.JdURL", tasURL); - System.setProperty("ducc.deploy.JpThreadCount","4"); - System.setProperty("ducc.deploy.service.type", "NotesService"); - System.setProperty("ducc.deploy.JpType", "uima"); - // use default error window (1,1) - ServiceWrapper service = new ServiceWrapper(); - - Timer fTimer = new Timer("testPullService Timer"); - // after 5secs stop the pull service - fTimer.schedule(new MyTimerTask(service, fTimer), 10000); - - service.initialize(new String[] {analysisEngineDescriptor}); - - service.start(); - - - } catch (ServiceInitializationException e) { - throw e; - } catch (Exception e) { - throw e; - } finally { - monitor.stop(); - System.getProperties().remove("ProcessFail"); - super.stopJetty(); - } - } - @Test - public void testPullServiceWrapperDDGenerator() throws Exception { - System.out.println("-------------------------- testPullServiceWrapperDDGenerator ----------------------");; - - //int scaleout = 2; - StateMonitor monitor = new StateMonitor(); - monitor.start(); - System.out.println("........... Monitor Port:"+System.getProperty("DUCC_STATE_UPDATE_PORT")); - super.startJetty(false); // don't block - // Dont change the name of TestAAE.xml. This is setup to fail file lookup and force - // generation of AE descriptor. - String analysisEngineDescriptor = "TestAAE.xml"; - System.setProperty("ducc.deploy.JpType", "uima"); - - String tasURL = "http://localhost:"+super.getPort()+"/test"; - try { - - System.setProperty("ducc.deploy.JdURL", tasURL); - System.setProperty("ducc.deploy.JpThreadCount","4"); - System.setProperty("ducc.deploy.service.type", "NotesService"); - System.setProperty("ducc.deploy.JpType", "uima"); - System.setProperty("ducc.deploy.JpAeDescriptor","NoOpAE"); - System.setProperty("ducc.deploy.JobDirectory",System.getProperty("user.dir")); - System.setProperty("ducc.deploy.JpFlowController","org.apache.uima.flow.FixedFlowController"); - System.setProperty("ducc.process.log.dir",System.getProperty("user.dir")); - System.setProperty("ducc.job.id","2000"); - ServiceWrapper service = new ServiceWrapper(); - - Timer fTimer = new Timer("testPullService Timer"); - // after 5secs stop the pull service - fTimer.schedule(new MyTimerTask(service, fTimer), 20000); - - service.initialize(new String[] {analysisEngineDescriptor}); - - service.start(); - - - } catch (ServiceInitializationException e) { - throw e; - } catch (Exception e) { - throw e; - } finally { - monitor.stop(); - super.stopJetty(); - File directory = new File(System.getProperty("user.dir"). - concat("/").concat(System.getProperty("ducc.job.id"))); - - if ( directory.exists() ) { - for (File f : directory.listFiles()) { - if (f.getName().startsWith("uima-ae-")) { - f.delete(); - System.out.println("Removed generated descriptor:"+f.getAbsolutePath()); - } - } - directory.delete(); - - } - - - } - } - class MyTimerTask extends TimerTask { - final ServiceWrapper service; - final Timer fTimer; - - MyTimerTask(ServiceWrapper service, Timer fTimer) { - this.service = service; - this.fTimer = fTimer; - } - - @Override - - public void run() { - this.cancel(); - fTimer.purge(); - fTimer.cancel(); - System.out.println("Timmer popped - stopping service"); - service.stop(); +public class JUnitServiceWrapperTestCase extends Client { + private static final long DELAY = 5000; + { + // static initializer sets amount of time the service delays + // sending READY to a monitor + System.setProperty("ducc.service.init.delay", "3000"); + } + + @Test + public void testPullServiceWrapperNoTask() throws Exception { + // make client return null task in response to GET + System.setProperty("simulate.no.work", "3"); + System.setProperty("ducc.process.thread.sleep.time", "1000"); + try { + testPullServiceWrapper(); + } finally { + System.getProperties().remove("simulate.no.work"); + } + } + + @Test + public void testPullServiceWrapper() throws Exception { + System.out.println("-------------------------- testPullServiceWrapper ----------------------"); + ; + + // int scaleout = 2; + StateMonitor monitor = new StateMonitor(); + monitor.start(); + System.out.println("........... Monitor Port:" + System.getProperty("DUCC_STATE_UPDATE_PORT")); + super.startJetty(false); // don't block + String analysisEngineDescriptor = "TestAAE"; + System.setProperty("ducc.deploy.JpType", "uima"); + + String tasURL = "http://localhost:" + super.getPort() + "/test"; + try { + System.setProperty("ducc.deploy.JdURL", tasURL); + System.setProperty("ducc.deploy.JpThreadCount", "12"); + System.setProperty("ducc.deploy.service.type", "NotesService"); + System.setProperty("ducc.deploy.JpType", "uima"); + + ServiceWrapper service = new ServiceWrapper(); + + Timer fTimer = new Timer("testPullService Timer"); + // after 5secs stop the pull service + fTimer.schedule(new MyTimerTask(service, fTimer), 40000); + + service.initialize(new String[] { analysisEngineDescriptor }); + + service.start(); + + } catch (ServiceInitializationException e) { + throw e; + } catch (Exception e) { + throw e; + } finally { + monitor.stop(); + super.stopJetty(); + + } + } + + @Test + public void testPullServiceWrapperWithProcessFailure() throws Exception { + System.out.println( + "-------------------------- testPullServiceWrapperWithProcessFailure ----------------------"); + ; + // int scaleout = 2; + StateMonitor monitor = new StateMonitor(); + monitor.start(); + System.out.println("........... Monitor Port:" + System.getProperty("DUCC_STATE_UPDATE_PORT")); + super.startJetty(false); // don't block + String analysisEngineDescriptor = "NoOpAE"; + + String tasURL = "http://localhost:" + super.getPort() + "/test"; + try { + // Force process failure of the first task + System.setProperty("ProcessFail", "1"); + + System.setProperty("ducc.deploy.JdURL", tasURL); + System.setProperty("ducc.deploy.JpThreadCount", "4"); + System.setProperty("ducc.deploy.service.type", "NotesService"); + System.setProperty("ducc.deploy.JpType", "uima"); + // use default error window (1,1) + ServiceWrapper service = new ServiceWrapper(); + + Timer fTimer = new Timer("testPullService Timer"); + // after 5secs stop the pull service + fTimer.schedule(new MyTimerTask(service, fTimer), 10000); + + service.initialize(new String[] { analysisEngineDescriptor }); + + service.start(); + + } catch (ServiceInitializationException e) { + throw e; + } catch (Exception e) { + throw e; + } finally { + monitor.stop(); + System.getProperties().remove("ProcessFail"); + super.stopJetty(); + } + } + + @Test + public void testPullServiceWrapperDDGenerator() throws Exception { + System.out.println( + "-------------------------- testPullServiceWrapperDDGenerator ----------------------"); + ; + + // int scaleout = 2; + StateMonitor monitor = new StateMonitor(); + monitor.start(); + System.out.println("........... Monitor Port:" + System.getProperty("DUCC_STATE_UPDATE_PORT")); + super.startJetty(false); // don't block + // Dont change the name of TestAAE.xml. This is setup to fail file lookup and force + // generation of AE descriptor. + String analysisEngineDescriptor = "TestAAE.xml"; + System.setProperty("ducc.deploy.JpType", "uima"); + + String tasURL = "http://localhost:" + super.getPort() + "/test"; + try { + + System.setProperty("ducc.deploy.JdURL", tasURL); + System.setProperty("ducc.deploy.JpThreadCount", "4"); + System.setProperty("ducc.deploy.service.type", "NotesService"); + System.setProperty("ducc.deploy.JpType", "uima"); + System.setProperty("ducc.deploy.JpAeDescriptor", "NoOpAE"); + System.setProperty("ducc.deploy.JobDirectory", System.getProperty("user.dir")); + System.setProperty("ducc.deploy.JpFlowController", + "org.apache.uima.flow.FixedFlowController"); + System.setProperty("ducc.process.log.dir", System.getProperty("user.dir")); + System.setProperty("ducc.job.id", "2000"); + ServiceWrapper service = new ServiceWrapper(); + + Timer fTimer = new Timer("testPullService Timer"); + // after 5secs stop the pull service + fTimer.schedule(new MyTimerTask(service, fTimer), 20000); + + service.initialize(new String[] { analysisEngineDescriptor }); + + service.start(); + + } catch (ServiceInitializationException e) { + throw e; + } catch (Exception e) { + throw e; + } finally { + monitor.stop(); + super.stopJetty(); + File directory = new File( + System.getProperty("user.dir").concat("/").concat(System.getProperty("ducc.job.id"))); + + if (directory.exists()) { + for (File f : directory.listFiles()) { + if (f.getName().startsWith("uima-ae-")) { + f.delete(); + System.out.println("Removed generated descriptor:" + f.getAbsolutePath()); + } + } + directory.delete(); + + } + + } + } + + class MyTimerTask extends TimerTask { + final ServiceWrapper service; + + final Timer fTimer; + + MyTimerTask(ServiceWrapper service, Timer fTimer) { + this.service = service; + this.fTimer = fTimer; + } + + @Override + + public void run() { + this.cancel(); + fTimer.purge(); + fTimer.cancel(); + System.out.println("Timmer popped - stopping service"); + service.stop(); - } + } - } + } } Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java?rev=1858488&r1=1858487&r2=1858488&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java Wed May 1 13:42:39 2019 @@ -24,6 +24,7 @@ import java.lang.management.ManagementFa import java.lang.management.RuntimeMXBean; import java.util.Map; import java.util.Objects; +import java.util.Random; import java.util.concurrent.atomic.AtomicLong; import org.apache.uima.UIMAFramework; @@ -126,7 +127,24 @@ public class NoOpAE extends CasAnnotator } } } + try { + //int n = getRandomNumberInRange(2000,3000); + int n = getRandomNumberInRange(250,450); + //System.out.println(" AE Sleeping for "+n + " millis"); + Thread.sleep(n); + + } catch( InterruptedException e) { + + } } + private static int getRandomNumberInRange(int min, int max) { + if (min >= max) { + throw new IllegalArgumentException("max must be greater than min"); + } + + Random r = new Random(); + return r.nextInt((max - min) + 1) + min; + } }
