Author: fhanik Date: Fri Jul 6 16:01:36 2012 New Revision: 1358287 URL: http://svn.apache.org/viewvc?rev=1358287&view=rev Log: Add in test for write error
Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java Modified: tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java?rev=1358287&r1=1358286&r2=1358287&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java (original) +++ tomcat/trunk/java/org/apache/coyote/AsyncStateMachine.java Fri Jul 6 16:01:36 2012 @@ -296,7 +296,8 @@ public class AsyncStateMachine<S> { public synchronized void asyncError() { if (state == AsyncState.DISPATCHED || - state == AsyncState.TIMING_OUT) { + state == AsyncState.TIMING_OUT || + state == AsyncState.READ_WRITE_OP) { state = AsyncState.ERROR; } else { throw new IllegalStateException( Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1358287&r1=1358286&r2=1358287&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original) +++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Fri Jul 6 16:01:36 2012 @@ -16,7 +16,12 @@ */ package org.apache.catalina.nonblocking; +import java.io.BufferedInputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -133,6 +138,69 @@ public class TestNonBlockingAPI extends Assert.assertEquals(HttpServletResponse.SC_OK, rc); } + + @Test + public void testNonBlockingWriteError() throws Exception { + String bind = "localhost"; + // Configure a context with digest auth and a single protected resource + Tomcat tomcat = getTomcatInstance(); + // Must have a real docBase - just use temp + StandardContext ctx = (StandardContext) tomcat.addContext("", System.getProperty("java.io.tmpdir")); + + NBWriteServlet servlet = new NBWriteServlet(); + String servletName = NBWriteServlet.class.getName(); + Wrapper servletWrapper = tomcat.addServlet(ctx, servletName, servlet); + ctx.addServletMapping("/", servletName); + tomcat.getConnector().setProperty("socket.txBufSize", "1024"); + tomcat.getConnector().setProperty("address", bind); + System.out.println(tomcat.getConnector().getProperty("address")); + tomcat.start(); + + Map<String, List<String>> resHeaders = new HashMap<String, List<String>>(); + ByteChunk slowReader = new ByteChunk(); + slowReader.setLimit(1); // FIXME BUFFER IS BROKEN, 0 doesn't work + slowReader.setByteOutputChannel(new ByteOutputChannel() { + long counter = 0; + long delta = 0; + + @Override + public void realWriteBytes(byte[] cbuf, int off, int len) throws IOException { + try { + if (len == 0) + return; + counter += len; + delta += len; + if (counter > bytesToDownload) { + System.out.println("ERROR Downloaded more than expected ERROR"); + } else if (counter == bytesToDownload) { + System.out.println("Download complete(" + bytesToDownload + " bytes)"); + // } else if (counter > (1966086)) { + // System.out.println("Download almost complete, missing bytes ("+counter+")"); + } else if (delta > (bytesToDownload / 16)) { + System.out.println("Read " + counter + " bytes."); + delta = 0; + Thread.currentThread().sleep(500); + } + } catch (Exception x) { + throw new IOException(x); + } + } + }); + int rc = postUrlWithDisconnect(true, new DataWriter(0), "http://" + bind + ":" + getPort() + "/", slowReader, resHeaders, + null); + slowReader.flushBuffer(); + Assert.assertEquals(HttpServletResponse.SC_OK, rc); + try { + //allow the listeners to finish up + Thread.sleep(1000); + } catch (Exception e) { + } + Assert.assertTrue("Error listener should have been invoked.", servlet.wlistener.onErrorInvoked); + + } + + + public static class DataWriter implements BytesStreamer { final int max = 5; int count = 0; @@ -180,8 +248,8 @@ public class TestNonBlockingAPI extends } @WebServlet(asyncSupported = true) - public static class NBReadServlet extends TesterServlet { - + public class NBReadServlet extends TesterServlet { + public volatile TestReadListener listener; @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { // step 1 - start async @@ -203,7 +271,7 @@ public class TestNonBlockingAPI extends @Override public void onError(AsyncEvent event) throws IOException { - System.out.println("onError"); + System.out.println("AsyncListener.onError"); } @@ -215,11 +283,11 @@ public class TestNonBlockingAPI extends }); // step 2 - notify on read ServletInputStream in = req.getInputStream(); - ReadListener rlist = new TestReadListener(actx); - in.setReadListener(rlist); + listener = new TestReadListener(actx); + in.setReadListener(listener); while (in.isReady()) { - rlist.onDataAvailable(); + listener.onDataAvailable(); } // step 3 - notify that we wish to read // ServletOutputStream out = resp.getOutputStream(); @@ -227,56 +295,13 @@ public class TestNonBlockingAPI extends } - private class TestReadListener implements ReadListener { - AsyncContext ctx; - - public TestReadListener(AsyncContext ctx) { - this.ctx = ctx; - } - - @Override - public void onDataAvailable() { - try { - ServletInputStream in = ctx.getRequest().getInputStream(); - int avail = 0; - String s = ""; - while ((avail = in.dataAvailable()) > 0) { - byte[] b = new byte[avail]; - in.read(b); - s += new String(b); - } - System.out.println(s); - if ("FINISHED".equals(s)) { - ctx.complete(); - ctx.getResponse().getWriter().print("OK"); - } else { - in.isReady(); - } - } catch (Exception x) { - x.printStackTrace(); - ctx.complete(); - } - - } - - @Override - public void onAllDataRead() { - System.out.println("onAllDataRead"); - - } - - @Override - public void onError(Throwable throwable) { - System.out.println("onError"); - throwable.printStackTrace(); - - } - } } @WebServlet(asyncSupported = true) - public static class NBWriteServlet extends TesterServlet { + public class NBWriteServlet extends TesterServlet { + public volatile TestWriteListener wlistener; + public volatile TestReadListener rlistener; @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { @@ -299,7 +324,7 @@ public class TestNonBlockingAPI extends @Override public void onError(AsyncEvent event) throws IOException { - System.out.println("onError"); + System.out.println("AsyncListener.onError"); } @@ -310,66 +335,179 @@ public class TestNonBlockingAPI extends } }); // step 2 - notify on read - // ServletInputStream in = req.getInputStream(); - // ReadListener rlist = new TestReadListener(actx); - // in.setReadListener(rlist); - // - // while (in.isReady()) { - // rlist.onDataAvailable(); - // } - // step 3 - notify that we wish to read + ServletInputStream in = req.getInputStream(); + rlistener = new TestReadListener(actx); + in.setReadListener(rlistener); ServletOutputStream out = resp.getOutputStream(); resp.setBufferSize(200 * 1024); - TestWriteListener listener = new TestWriteListener(actx); - out.setWriteListener(listener); - listener.onWritePossible(); + wlistener = new TestWriteListener(actx); + out.setWriteListener(wlistener); + wlistener.onWritePossible(); } - private class TestWriteListener implements WriteListener { - long chunk = 1024 * 1024; - AsyncContext ctx; - long bytesToDownload = TestNonBlockingAPI.bytesToDownload; - public TestWriteListener(AsyncContext ctx) { - this.ctx = ctx; + } + private class TestReadListener implements ReadListener { + AsyncContext ctx; + public volatile boolean onErrorInvoked = false; + + public TestReadListener(AsyncContext ctx) { + this.ctx = ctx; + } + + @Override + public void onDataAvailable() { + try { + ServletInputStream in = ctx.getRequest().getInputStream(); + int avail = 0; + String s = ""; + while ((avail = in.dataAvailable()) > 0) { + byte[] b = new byte[avail]; + in.read(b); + s += new String(b); + } + System.out.println(s); + if ("FINISHED".equals(s)) { + ctx.complete(); + ctx.getResponse().getWriter().print("OK"); + } else { + in.isReady(); + } + } catch (Exception x) { + x.printStackTrace(); + ctx.complete(); } - @Override - public void onWritePossible() { - System.out.println("onWritePossible"); - try { - long left = Math.max(bytesToDownload, 0); - long start = System.currentTimeMillis(); - long end = System.currentTimeMillis(); - long before = left; - while (left > 0 && ctx.getResponse().getOutputStream().canWrite()) { - byte[] b = new byte[(int) Math.min(chunk, bytesToDownload)]; - Arrays.fill(b, (byte) 'X'); - ctx.getResponse().getOutputStream().write(b); - bytesToDownload -= b.length; - left = Math.max(bytesToDownload, 0); - } - System.out - .println("Write took:" + (end - start) + " ms. Bytes before=" + before + " after=" + left); - // only call complete if we have emptied the buffer - if (left == 0 && ctx.getResponse().getOutputStream().canWrite()) { - // it is illegal to call complete - // if there is a write in progress - ctx.complete(); + } + + @Override + public void onAllDataRead() { + System.out.println("onAllDataRead"); + + } + + @Override + public void onError(Throwable throwable) { + System.out.println("ReadListener.onError"); + throwable.printStackTrace(); + onErrorInvoked = true; + + } + } + + private class TestWriteListener implements WriteListener { + long chunk = 1024 * 1024; + AsyncContext ctx; + long bytesToDownload = TestNonBlockingAPI.bytesToDownload; + public volatile boolean onErrorInvoked = false; + + public TestWriteListener(AsyncContext ctx) { + this.ctx = ctx; + } + + @Override + public void onWritePossible() { + System.out.println("onWritePossible"); + try { + long left = Math.max(bytesToDownload, 0); + long start = System.currentTimeMillis(); + long end = System.currentTimeMillis(); + long before = left; + while (left > 0 && ctx.getResponse().getOutputStream().canWrite()) { + byte[] b = new byte[(int) Math.min(chunk, bytesToDownload)]; + Arrays.fill(b, (byte) 'X'); + ctx.getResponse().getOutputStream().write(b); + bytesToDownload -= b.length; + left = Math.max(bytesToDownload, 0); + } + System.out.println("Write took:" + (end - start) + " ms. Bytes before=" + before + " after=" + left); + // only call complete if we have emptied the buffer + if (left == 0 && ctx.getResponse().getOutputStream().canWrite()) { + // it is illegal to call complete + // if there is a write in progress + ctx.complete(); + } + } catch (Exception x) { + x.printStackTrace(); + } + + } + + @Override + public void onError(Throwable throwable) { + System.out.println("WriteListener.onError"); + throwable.printStackTrace(); + onErrorInvoked = true; + } + + } + + public static int postUrlWithDisconnect(boolean stream, BytesStreamer streamer, String path, ByteChunk out, + Map<String, List<String>> reqHead, Map<String, List<String>> resHead) throws IOException { + + URL url = new URL(path); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setDoOutput(true); + connection.setReadTimeout(1000000); + if (reqHead != null) { + for (Map.Entry<String, List<String>> entry : reqHead.entrySet()) { + StringBuilder valueList = new StringBuilder(); + for (String value : entry.getValue()) { + if (valueList.length() > 0) { + valueList.append(','); } - } catch (Exception x) { - x.printStackTrace(); + valueList.append(value); } + connection.setRequestProperty(entry.getKey(), valueList.toString()); + } + } + if (streamer != null && stream) { + if (streamer.getLength() > 0) { + connection.setFixedLengthStreamingMode(streamer.getLength()); + } else { + connection.setChunkedStreamingMode(1024); + } + } + + connection.connect(); + // Write the request body + OutputStream os = null; + try { + os = connection.getOutputStream(); + while (streamer != null && streamer.available() > 0) { + byte[] next = streamer.next(); + os.write(next); + os.flush(); } - @Override - public void onError(Throwable throwable) { - System.out.println("onError"); - throwable.printStackTrace(); + } finally { + if (os != null) { + try { + os.close(); + } catch (IOException ioe) { + // Ignore + } } + } + int rc = connection.getResponseCode(); + if (resHead != null) { + Map<String, List<String>> head = connection.getHeaderFields(); + resHead.putAll(head); } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + if (rc == HttpServletResponse.SC_OK) { + connection.getInputStream().close(); + os.close(); + connection.disconnect(); + } + return rc; } + + } Modified: tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java?rev=1358287&r1=1358286&r2=1358287&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java (original) +++ tomcat/trunk/test/org/apache/catalina/startup/TomcatBaseTest.java Fri Jul 6 16:01:36 2012 @@ -290,7 +290,7 @@ public abstract class TomcatBaseTest ext public static int postUrl(final byte[] body, String path, ByteChunk out, Map<String, List<String>> reqHead, Map<String, List<String>> resHead) throws IOException { - BytesStreamer s = new BytesStreamer() { + BytesStreamer s = new BytesStreamer() { boolean done = false; @Override public byte[] next() { --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org