This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push: new 5bc57a82fe Fix BZ 68227. Ensure onComplete() is called if onError() calls dispatch 5bc57a82fe is described below commit 5bc57a82feef407e7b0843c9ce3f00feec2fc701 Author: Mark Thomas <ma...@apache.org> AuthorDate: Thu Nov 30 12:50:58 2023 +0000 Fix BZ 68227. Ensure onComplete() is called if onError() calls dispatch --- java/org/apache/coyote/AsyncStateMachine.java | 1 + .../TestAsyncContextImplListenerOnComplete.java | 182 +++++++++++++++++++++ webapps/docs/changelog.xml | 5 + 3 files changed, 188 insertions(+) diff --git a/java/org/apache/coyote/AsyncStateMachine.java b/java/org/apache/coyote/AsyncStateMachine.java index be322c6426..42108086e6 100644 --- a/java/org/apache/coyote/AsyncStateMachine.java +++ b/java/org/apache/coyote/AsyncStateMachine.java @@ -292,6 +292,7 @@ class AsyncStateMachine { if (processor.getErrorState().isIoAllowed() && processor.flushBufferedWrite()) { return SocketState.LONG; } + asyncCtxt.fireOnComplete(); updateState(AsyncState.DISPATCHED); asyncCtxt.decrementInProgressAsyncCount(); return SocketState.ASYNC_END; diff --git a/test/org/apache/catalina/core/TestAsyncContextImplListenerOnComplete.java b/test/org/apache/catalina/core/TestAsyncContextImplListenerOnComplete.java new file mode 100644 index 0000000000..0e18405cda --- /dev/null +++ b/test/org/apache/catalina/core/TestAsyncContextImplListenerOnComplete.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.core; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.servlet.AsyncContext; +import jakarta.servlet.AsyncEvent; +import jakarta.servlet.AsyncListener; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.catalina.Context; +import org.apache.catalina.Wrapper; +import org.apache.catalina.startup.Tomcat; +import org.apache.catalina.startup.TomcatBaseTest; + +public class TestAsyncContextImplListenerOnComplete extends TomcatBaseTest { + + /* + * https://bz.apache.org/bugzilla/show_bug.cgi?id=68227 + */ + @Test + public void testAfterNetworkErrorThenDispatch() throws Exception { + // Setup Tomcat instance + Tomcat tomcat = getTomcatInstance(); + + // No file system docBase required + Context ctx = tomcat.addContext("", null); + + // Latch to track that complete has been called + CountDownLatch completeLatch = new CountDownLatch(1); + + Wrapper servletWrapper = tomcat.addServlet("", "repro-servlet", new ReproServlet(completeLatch)); + servletWrapper.addMapping("/repro"); + servletWrapper.setAsyncSupported(true); + servletWrapper.setLoadOnStartup(1); + + ctx.addServletMappingDecoded("/", "repro-servlet"); + + tomcat.start(); + Thread.sleep(2000); + + triggerBrokenPipe(getPort()); + + Assert.assertTrue(completeLatch.await(30, TimeUnit.SECONDS)); + } + + + private void triggerBrokenPipe(int port) throws IOException, InterruptedException { + try (Socket socket = new Socket()) { + socket.setReceiveBufferSize(1); + socket.connect(new InetSocketAddress("localhost", port)); + + try (var writer = new OutputStreamWriter(socket.getOutputStream())) { + writer.write(""" + GET /repro + Accept: text/event-stream + + """); + writer.flush(); + } + Thread.sleep(1_000); + } + } + + + private static class ReproServlet extends jakarta.servlet.http.HttpServlet { + + private static final long serialVersionUID = 1L; + private final EventSource eventSource = new EventSource(); + + private final CountDownLatch completeLatch; + + ReproServlet(CountDownLatch completeLatch) { + this.completeLatch = completeLatch; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) { + resp.setContentType("text/event-stream"); + resp.setCharacterEncoding("UTF-8"); + resp.setStatus(200); + AsyncContext context = req.startAsync(); + context.addListener(new ReproAsyncListener()); + eventSource.add(context); + } + + private class ReproAsyncListener implements AsyncListener { + @Override + public void onComplete(AsyncEvent event) { + try { + eventSource.remove(event.getAsyncContext()); + } finally { + completeLatch.countDown(); + } + } + + @Override + public void onTimeout(AsyncEvent event) { + // Not expected + new AssertionError("onTimeout").printStackTrace(); + } + + @Override + public void onError(AsyncEvent event) { + event.getAsyncContext().dispatch(); // Mirroring Spring's behaviour + } + + @Override + public void onStartAsync(AsyncEvent event) { + // NO-OP + } + } + } + + + private static class EventSource { + + private final Set<AsyncContext> contexts = new HashSet<>(); + + private EventSource() { + Thread.ofVirtual() + .name(getClass().getSimpleName()) + .start(() -> { + while (true) { + try { + Thread.sleep(2000); + send("PING"); + } catch (InterruptedException e) { + System.out.println("Failed to sleep: " + e); + } + } + }); + } + + public void send(String message) { + for (AsyncContext context : contexts) { + try { + PrintWriter writer = context.getResponse().getWriter(); + writer.write("event: " + message + "\n\n"); + writer.flush(); + } catch (Exception e) { + System.out.println("Failed to send event: " + e); + } + } + } + + public void add(AsyncContext context) { + contexts.add(context); + } + + public void remove(AsyncContext context) { + contexts.remove(context); + } + } +} diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 04b77f6b9c..3eecf9f9b9 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -127,6 +127,11 @@ list of support locks when provided in response to a PROPFIND request was incorrectly XML escaped. (markt) </fix> + <fix> + <bug>68227</bug>: Ensure that <code>AsyncListener.onComplete()</code> is + called if <code>AsyncListener.onError()</code> calls + <code>AsyncContext.dispatch()</code>. (markt) + </fix> </changelog> </subsection> <subsection name="Coyote"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org