I think AsyncListener.onError() is a terminal state/call.
The AsyncContext is no longer valid once that is called.
The response is committed/closed as well.
Moving to the complete() state isn't even possible.
(Note: I could be wrong in this assumption)

Some other advice, don't rely on context.getRequest and context.getResponse
to always be there and be available.
The lifecycle of an AsyncContext, and WriteListener can be longer than the
lifecycle of a Request/Response.
Its very possible for the Request/Response to be recycled / cleaned up
before the AsyncContext and WriteListener are done.

Yank them out into variables and use them that way.
For example, your call to context.getRequest.getServletContext.log is just
asking for problems (especially in an onError)



Joakim Erdfelt / [email protected]

On Wed, Sep 9, 2015 at 10:54 AM, Xiaodong Wang <[email protected]>
wrote:

> I try to use the asynchronous servlet with Jetty 9, but I
> get IllegalStateException from HttpOutput class. The same code works well
> with Tomcat.
>
> This is my test code (in Scala).
>
> TestServers.scala
> ------------------------------------------------------------------------------------------------------------------------
> package playground.webserver
>
> import java.io.File
> import javax.servlet.Servlet
>
> import org.apache.catalina.startup.Tomcat
> import org.eclipse.jetty.server.Server
> import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
> ;
>
> trait TestServer {
>   val port: Int
>   def start: Unit
>   def stop: Unit
>   def addServlet(servlet: Servlet, contextPath: String)
> }
>
> class TomcatTestServer(val port: Int) extends TestServer {
>   private val tomcat = new Tomcat()
>   tomcat.setPort(port)
>
>   override def start: Unit = tomcat.start()
>
>   override def stop: Unit = tomcat.stop()
>
>   override def addServlet(servlet: Servlet, contextPath: String): Unit = {
>   val base = new File(System.getProperty("java.io.tmpdir"));
>   val context = tomcat.addContext(contextPath, base.getAbsolutePath());
>
>   tomcat.addServlet(contextPath, contextPath, servlet)
>   context.addServletMapping("/", contextPath)
>   }
> }
>
> class JettyTestServer(val port: Int) extends TestServer {
>   private val jetty = new Server(port)
>   private val context = new
> ServletContextHandler(ServletContextHandler.SESSIONS)
>   context.setContextPath("/")
>   jetty.setHandler(context)
>
>   override def start: Unit = jetty.start()
>
>   override def stop: Unit = jetty.stop()
>
>   override def addServlet(servlet: Servlet, contextPath: String): Unit = {
>   context.addServlet(new ServletHolder(servlet), contextPath)
>   }
> }
>
>
>
> AsyncIoTest.scala------------------------------------------------------------------------------------------------------------------------------------
> package playground.webserver
>
> import java.net.ServerSocket
> import javax.servlet.http.{HttpServlet, HttpServletRequest,
> HttpServletResponse}
> import javax.servlet.{AsyncContext, WriteListener}
>
> import org.apache.http.client.methods.HttpGet
> import org.apache.http.impl.client.HttpClients
> import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
>
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
>
> abstract class AsyncIoTest extends FunSpec with Matchers with
> BeforeAndAfterAll {
>   private val data = new Array[Byte](1024 * 1024 * 128)
>
>   *class DataWriteListener(context: AsyncContext) extends WriteListener {*
> *    private[this] var pos = 0*
>
> *    override def onError(t: Throwable): Unit = {*
> *      context.getRequest.getServletContext.log("Async Error", t)*
> *      context.complete()*
> *    }*
>
> *    override def onWritePossible(): Unit = {*
> *      val out = context.getResponse.getOutputStream*
> *      while (out.isReady && pos < data.length) {*
> *        val toWrite = math.min(1024, data.length - pos)*
> *        out.write(data, pos, toWrite)*
> *        pos += toWrite*
> *      }*
>
> *      if (pos >= data.length) {*
> *        context.complete()*
> *      }*
> *    }*
> *  }*
>
>   *class AsyncServlet extends HttpServlet {*
> *    override def doGet(req: HttpServletRequest, resp:
> HttpServletResponse): Unit = {*
> *      req.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);*
>
> *      resp.setStatus(200)*
> *      resp.setContentLength(data.length)*
>
> *      val async = req.startAsync()*
> *      val out = resp.getOutputStream*
>
> *      out.setWriteListener(new DataWriteListener(async))*
> *    }*
> *  }*
>
>   class SyncServlet extends HttpServlet {
>     override def doGet(req: HttpServletRequest, resp:
> HttpServletResponse): Unit = {
>       resp.setStatus(200)
>       resp.setContentLength(data.length)
>       val out = resp.getOutputStream
>       var pos = 0
>       while(pos < data.length) {
>         out.write(data, pos, 1024)
>         pos += 1024
>       }
>     }
>   }
>
>   protected val server: TestServer
>   protected lazy val port: Int = findAvailablePort()
>   private val requestNum = 100
>
>   private def findAvailablePort(): Int = {
>     val serverSocket = new ServerSocket(0)
>     val port = serverSocket.getLocalPort()
>     serverSocket.close()
>
>     port
>   }
>
>   describe("Sync IO") {
>     it("trigger some traffic") {
>       testWithUrl(s"http://localhost:${port}/sync";)
>     }
>   }
>
>   describe("Async IO") {
>     it("trigger some traffic") {
>       testWithUrl(s"http://localhost:${port}/async";)
>     }
>   }
>
>   override protected def beforeAll(): Unit = {
>     server.addServlet(new AsyncServlet, "/async")
>     server.addServlet(new SyncServlet, "/sync")
>     server.start
>   }
>
>
>   override protected def afterAll(): Unit = {
>     server.stop
>   }
>
>   private def testWithUrl(url: String): Unit = {
>     val start = System.currentTimeMillis
>
>     val futures = (1 to requestNum).map { i =>
>       Future {
>         val total = readData(url)
>         total should be(1024 * 1024 * 128)
>         total
>       }
>     }
>
>     Await.result(Future.sequence(futures), Duration.Inf)
>
>     println(s"Total millis used: ${System.currentTimeMillis - start}")
>   }
>
>   private def readData(url: String): Long = {
>     val httpclient = HttpClients.createDefault()
>     val httpGet = new HttpGet(url)
>     val resp = httpclient.execute(httpGet)
>     val is = resp.getEntity.getContent
>     val buf = new Array[Byte](1024 * 1024)
>     var read = 0
>     var total = 0L
>     while(read != -1) {
>       total += read
>       read = is.read(buf)
>     }
>
>     resp.close()
>     total
>   }
> }
>
> class AsyncIoWithJettyTest extends AsyncIoTest {
>   override protected val server: TestServer = new JettyTestServer(port)
> }
>
> class AsyncIoWithTomcatTest extends AsyncIoTest {
>   override protected val server: TestServer = new TomcatTestServer(port)
> }
>
>
> This is the error I got.
> [qtp990355670-39] WARN org.eclipse.jetty.util.thread.QueuedThreadPool -
> [qtp990355670-32] WARN org.eclipse.jetty.server.HttpChannel -
> //localhost:51809/async
> java.lang.IllegalStateException
> at
> org.eclipse.jetty.server.HttpOutput$AsyncICB.onCompleteSuccess(HttpOutput.java:990)
> at
> org.eclipse.jetty.server.HttpOutput$AsyncWrite.onCompleteSuccess(HttpOutput.java:1126)
> at
> org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325)
> at
> org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365)
> at
> org.eclipse.jetty.server.HttpConnection$SendCallback.onCompleteSuccess(HttpConnection.java:747)
> at
> org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325)
> at
> org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365)
> at
> org.eclipse.jetty.io.WriteFlusher$PendingState.complete(WriteFlusher.java:269)
> at org.eclipse.jetty.io.WriteFlusher.completeWrite(WriteFlusher.java:394)
> at
> org.eclipse.jetty.io.SelectChannelEndPoint$3.run(SelectChannelEndPoint.java:89)
> at
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
> at
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
> at java.lang.Thread.run(Thread.java:745)
> [qtp990355670-39] WARN org.eclipse.jetty.util.thread.QueuedThreadPool -
> Unexpected thread death:
> org.eclipse.jetty.util.thread.QueuedThreadPool$3@26b0a9dd in
> qtp990355670{STARTED,8<=20<=200,i=9,q=0}
> java.lang.NullPointerException
> at
> playground.webserver.AsyncIoTest$DataWriteListener.onError(AsyncIoTest.scala:22)
> at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:201)
> at org.eclipse.jetty.server.Response.closeOutput(Response.java:987)
> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:412)
> at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:262)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
> at java.lang.Thread.run(Thread.java:745)
> ......
>
>
>
>
>
>
>
>
>
> _______________________________________________
> jetty-users mailing list
> [email protected]
> To change your delivery options, retrieve your password, or unsubscribe
> from this list, visit
> https://dev.eclipse.org/mailman/listinfo/jetty-users
>
_______________________________________________
jetty-users mailing list
[email protected]
To change your delivery options, retrieve your password, or unsubscribe from 
this list, visit
https://dev.eclipse.org/mailman/listinfo/jetty-users

Reply via email to