Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java?rev=1858488&r1=1858487&r2=1858488&view=diff ============================================================================== --- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java (original) +++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java Wed May 1 13:42:39 2019 @@ -21,6 +21,7 @@ package org.apache.uima.ducc.ps.transpor import java.io.BufferedReader; import java.io.IOException; import java.net.ServerSocket; +import java.util.HashMap; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -46,189 +47,219 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.io.xml.DomDriver; + public class JunitTransportTestCase { - private Server server; - private final static String app="test"; - private int httpPort = 12222; - private int maxThreads = 20; - private int getJettyPort() { - while(true) { - ServerSocket socket=null; - try { - socket = new ServerSocket(httpPort); - break; - } catch( IOException e) { - httpPort++; - } finally { - if ( socket != null ) { - try { - socket.close(); - } catch( Exception ee) {} - - } - } - } - return httpPort; - } - private int getPort() { - - return httpPort; - } - @Before - public void startJetty() throws Exception - { - - - QueuedThreadPool threadPool = new QueuedThreadPool(); - if (maxThreads < threadPool.getMinThreads()) { - // logger.warn("JobDriver", jobid, - // "Invalid value for jetty MaxThreads("+maxThreads+") - it should be greater or equal to "+threadPool.getMinThreads()+". Defaulting to jettyMaxThreads="+threadPool.getMaxThreads()); - threadPool.setMaxThreads(threadPool.getMinThreads()); - } else { - threadPool.setMaxThreads(maxThreads); - } - - server = new Server(threadPool); - - // Server connector - ServerConnector connector = new ServerConnector(server); - connector.setPort(getJettyPort()); - server.setConnectors(new Connector[] { connector }); - System.out.println("launching Jetty on Port:"+connector.getPort()); - ServletContextHandler context = new ServletContextHandler( - ServletContextHandler.SESSIONS); - context.setContextPath("/"); - server.setHandler(context); - - context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/"+app); - - - server.start(); - System.out.println("Jetty Started - Waiting for Messages ..."); - } - - @After - public void stopJetty() - { - try - { - server.stop(); + private Server server; + + private final static String app = "test"; + + private int httpPort = 12222; + + private int maxThreads = 20; + + private ThreadLocal<HashMap<Long, XStream>> localXStream = new ThreadLocal<HashMap<Long, XStream>>() { + @Override + protected HashMap<Long, XStream> initialValue() { + return new HashMap<Long, XStream>(); + } + }; + + private int getJettyPort() { + while (true) { + ServerSocket socket = null; + try { + socket = new ServerSocket(httpPort); + break; + } catch (IOException e) { + httpPort++; + } finally { + if (socket != null) { + try { + socket.close(); + } catch (Exception ee) { + } + } - catch (Exception e) - { - e.printStackTrace(); + } + } + return httpPort; + } + + private int getPort() { + + return httpPort; + } + + @Before + public void startJetty() throws Exception { + + QueuedThreadPool threadPool = new QueuedThreadPool(); + if (maxThreads < threadPool.getMinThreads()) { + // logger.warn("JobDriver", jobid, + // "Invalid value for jetty MaxThreads("+maxThreads+") - it should be greater or equal to + // "+threadPool.getMinThreads()+". Defaulting to + // jettyMaxThreads="+threadPool.getMaxThreads()); + threadPool.setMaxThreads(threadPool.getMinThreads()); + } else { + threadPool.setMaxThreads(maxThreads); + } + + server = new Server(threadPool); + + // Server connector + ServerConnector connector = new ServerConnector(server); + connector.setPort(getJettyPort()); + server.setConnectors(new Connector[] { connector }); + System.out.println("launching Jetty on Port:" + connector.getPort()); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + server.setHandler(context); + + context.addServlet(new ServletHolder(new TaskHandlerServlet()), "/" + app); + + server.start(); + System.out.println("Jetty Started - Waiting for Messages ..."); + } + + @After + public void stopJetty() { + try { + server.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + System.out.println("Jetty Stopped"); + } + + private void wait(DefaultRegistryClient registryClient) { + synchronized (registryClient) { + try { + registryClient.wait(5 * 1000); + + } catch (InterruptedException e) { + } + } + } + + @Test + public void testTransportBasicConnectivity() throws Exception { + int scaleout = 12; + ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app); + DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl); + HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout); + transport.initialize(); + // String response = transport.getWork("Test"); + // System.out.println("Test Received Response:"+response); + + // assertThat("Response Code", http.getResponseCode(), (equal((HttpStatus.OK_200))); + } + + @Test + public void testTransportIOException() throws Exception { + localXStream.get().clear(); + if (localXStream.get().get(Thread.currentThread().getId()) == null) { + localXStream.get().put(Thread.currentThread().getId(), new XStream(new DomDriver())); + } + + System.out.println(".... Test::testTransportIOException"); + int scaleout = 12; + ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app); + DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl); + HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout); + transport.initialize(); + System.setProperty("MockHttpPostError", ERROR.IOException.name()); + IMetaTaskTransaction transaction = new MetaTaskTransaction(); + // transport.dispatch(XStreamUtils.marshall(transaction),localXStream); + transport.dispatch(localXStream.get().get(Thread.currentThread().getId()).toXML(transaction), + localXStream); + + wait(registryClient); + } + + @Test + public void testTransportNoRoutToHostException() throws Exception { + localXStream.get().clear(); + if (localXStream.get().get(Thread.currentThread().getId()) == null) { + localXStream.get().put(Thread.currentThread().getId(), new XStream(new DomDriver())); + } + + System.out.println(".... Test::testTransportNoRoutToHostException"); + int scaleout = 12; + ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app); + DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl); + HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout); + transport.initialize(); + System.setProperty("MockHttpPostError", ERROR.NoRouteToHostException.name()); + IMetaTaskTransaction transaction = new MetaTaskTransaction(); + // transport.dispatch(XStreamUtils.marshall(transaction),localXStream); + transport.dispatch(localXStream.get().get(Thread.currentThread().getId()).toXML(transaction), + localXStream); + wait(registryClient); + + } + + @Test + public void testTransportURISyntaxException() throws Exception { + System.out.println(".... Test::testTransportURISyntaxException"); + localXStream.get().clear(); + if (localXStream.get().get(Thread.currentThread().getId()) == null) { + localXStream.get().put(Thread.currentThread().getId(), new XStream(new DomDriver())); + } + + int scaleout = 12; + ITargetURI targetUrl = new HttpTargetURI("http://localhost:" + getPort() + "/" + app); + DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl); + HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout); + transport.initialize(); + System.setProperty("MockHttpPostError", ERROR.URISyntaxException.name()); + IMetaTaskTransaction transaction = new MetaTaskTransaction(); + // transport.dispatch(XStreamUtils.marshall(transaction), localXStream); + transport.dispatch(localXStream.get().get(Thread.currentThread().getId()).toXML(transaction), + localXStream); + wait(registryClient); + + } + + public class TaskHandlerServlet extends HttpServlet { + private static final long serialVersionUID = 1L; + + public TaskHandlerServlet() { + } + + protected void doPost(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + try { + System.out.println("Handling HTTP Post Request"); + // long post_stime = System.nanoTime(); + StringBuilder sb = new StringBuilder(); + BufferedReader reader = request.getReader(); + String line; + while ((line = reader.readLine()) != null) { + sb.append(line); } - System.out.println("Jetty Stopped"); + String content = sb.toString().trim(); + + // System.out.println( "Http Request Body:::"+content); + + String nodeIP = request.getHeader("IP"); + String nodeName = request.getHeader("Hostname"); + String threadID = request.getHeader("ThreadID"); + String pid = request.getHeader("PID"); + System.out.println("Sender ID:::Node IP" + nodeIP + " Node Name:" + nodeName + " PID:" + pid + + " ThreadID:" + threadID); + IMetaTaskTransaction transaction = (IMetaTaskTransaction) XStreamUtils.unmarshall(content); + transaction.setMetaTask(new MetaTask(1, "", null)); + transaction.getMetaTask().setUserSpaceTask("Test Message"); + String serializedResponse = XStreamUtils.marshall(transaction); + // System.out.println(serializedResponse); + response.getWriter().write(serializedResponse); + } catch (Throwable e) { + e.printStackTrace(); + throw new ServletException(e); + } } - private void wait(DefaultRegistryClient registryClient) { - synchronized(registryClient) { - try { - registryClient.wait(5*1000); - - } catch( InterruptedException e) {} - } - } - @Test - public void testTransportBasicConnectivity() throws Exception - { - int scaleout = 12; - ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app); - DefaultRegistryClient registryClient = - new DefaultRegistryClient(targetUrl); - HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout); - transport.initialize(); - //String response = transport.getWork("Test"); - //System.out.println("Test Received Response:"+response); - -// assertThat("Response Code", http.getResponseCode(), (equal((HttpStatus.OK_200))); - } - @Test - public void testTransportIOException() throws Exception - { - System.out.println(".... Test::testTransportIOException"); - int scaleout = 12; - ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app); - DefaultRegistryClient registryClient = - new DefaultRegistryClient(targetUrl); - HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout); - transport.initialize(); - System.setProperty("MockHttpPostError", ERROR.IOException.name()); - IMetaTaskTransaction transaction = new MetaTaskTransaction(); - transport.dispatch(XStreamUtils.marshall(transaction)); - - wait(registryClient); - } - @Test - public void testTransportNoRoutToHostException() throws Exception - { - System.out.println(".... Test::testTransportNoRoutToHostException"); - int scaleout = 12; - ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app); - DefaultRegistryClient registryClient = - new DefaultRegistryClient(targetUrl); - HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout); - transport.initialize(); - System.setProperty("MockHttpPostError", ERROR.NoRouteToHostException.name()); - IMetaTaskTransaction transaction = new MetaTaskTransaction(); - transport.dispatch(XStreamUtils.marshall(transaction)); - wait(registryClient); - - } - @Test - public void testTransportURISyntaxException() throws Exception - { - System.out.println(".... Test::testTransportURISyntaxException"); - int scaleout = 12; - ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+getPort()+"/"+app); - DefaultRegistryClient registryClient = - new DefaultRegistryClient(targetUrl); - HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout); - transport.initialize(); - System.setProperty("MockHttpPostError", ERROR.URISyntaxException.name()); - IMetaTaskTransaction transaction = new MetaTaskTransaction(); - transport.dispatch(XStreamUtils.marshall(transaction)); - wait(registryClient); - - } - public class TaskHandlerServlet extends HttpServlet { - private static final long serialVersionUID = 1L; - - public TaskHandlerServlet() { - } - - protected void doPost(HttpServletRequest request, - HttpServletResponse response) throws ServletException, - IOException { - try { - System.out.println("Handling HTTP Post Request"); - //long post_stime = System.nanoTime(); - StringBuilder sb = new StringBuilder(); - BufferedReader reader = request.getReader(); - String line; - while ((line = reader.readLine()) != null) { - sb.append(line); - } - String content = sb.toString().trim(); - - //System.out.println( "Http Request Body:::"+content); - - - String nodeIP = request.getHeader("IP"); - String nodeName = request.getHeader("Hostname"); - String threadID = request.getHeader("ThreadID"); - String pid = request.getHeader("PID"); - System.out.println( "Sender ID:::Node IP"+nodeIP+" Node Name:"+nodeName+" PID:"+pid+" ThreadID:"+threadID); - IMetaTaskTransaction transaction = (IMetaTaskTransaction)XStreamUtils.unmarshall(content); - transaction.setMetaTask(new MetaTask(1,"",null)); - transaction.getMetaTask().setUserSpaceTask("Test Message"); - String serializedResponse = XStreamUtils.marshall(transaction); - //System.out.println(serializedResponse); - response.getWriter().write(serializedResponse); - } catch (Throwable e) { - e.printStackTrace(); - throw new ServletException(e); - } - } - } + } }
