This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bc57a82fe Fix BZ 68227. Ensure onComplete() is called if onError() 
calls dispatch
5bc57a82fe is described below

commit 5bc57a82feef407e7b0843c9ce3f00feec2fc701
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Thu Nov 30 12:50:58 2023 +0000

    Fix BZ 68227. Ensure onComplete() is called if onError() calls dispatch
---
 java/org/apache/coyote/AsyncStateMachine.java      |   1 +
 .../TestAsyncContextImplListenerOnComplete.java    | 182 +++++++++++++++++++++
 webapps/docs/changelog.xml                         |   5 +
 3 files changed, 188 insertions(+)

diff --git a/java/org/apache/coyote/AsyncStateMachine.java 
b/java/org/apache/coyote/AsyncStateMachine.java
index be322c6426..42108086e6 100644
--- a/java/org/apache/coyote/AsyncStateMachine.java
+++ b/java/org/apache/coyote/AsyncStateMachine.java
@@ -292,6 +292,7 @@ class AsyncStateMachine {
             if (processor.getErrorState().isIoAllowed() && 
processor.flushBufferedWrite()) {
                 return SocketState.LONG;
             }
+            asyncCtxt.fireOnComplete();
             updateState(AsyncState.DISPATCHED);
             asyncCtxt.decrementInProgressAsyncCount();
             return SocketState.ASYNC_END;
diff --git 
a/test/org/apache/catalina/core/TestAsyncContextImplListenerOnComplete.java 
b/test/org/apache/catalina/core/TestAsyncContextImplListenerOnComplete.java
new file mode 100644
index 0000000000..0e18405cda
--- /dev/null
+++ b/test/org/apache/catalina/core/TestAsyncContextImplListenerOnComplete.java
@@ -0,0 +1,182 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.catalina.core;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import jakarta.servlet.AsyncContext;
+import jakarta.servlet.AsyncEvent;
+import jakarta.servlet.AsyncListener;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.catalina.Context;
+import org.apache.catalina.Wrapper;
+import org.apache.catalina.startup.Tomcat;
+import org.apache.catalina.startup.TomcatBaseTest;
+
+public class TestAsyncContextImplListenerOnComplete extends TomcatBaseTest {
+
+    /*
+     * https://bz.apache.org/bugzilla/show_bug.cgi?id=68227
+     */
+    @Test
+    public void testAfterNetworkErrorThenDispatch() throws Exception {
+        // Setup Tomcat instance
+        Tomcat tomcat = getTomcatInstance();
+
+        // No file system docBase required
+        Context ctx = tomcat.addContext("", null);
+
+        // Latch to track that complete has been called
+        CountDownLatch completeLatch = new CountDownLatch(1);
+
+        Wrapper servletWrapper = tomcat.addServlet("", "repro-servlet", new 
ReproServlet(completeLatch));
+        servletWrapper.addMapping("/repro");
+        servletWrapper.setAsyncSupported(true);
+        servletWrapper.setLoadOnStartup(1);
+
+        ctx.addServletMappingDecoded("/", "repro-servlet");
+
+        tomcat.start();
+        Thread.sleep(2000);
+
+        triggerBrokenPipe(getPort());
+
+        Assert.assertTrue(completeLatch.await(30, TimeUnit.SECONDS));
+    }
+
+
+    private void triggerBrokenPipe(int port) throws IOException, 
InterruptedException {
+        try (Socket socket = new Socket()) {
+            socket.setReceiveBufferSize(1);
+            socket.connect(new InetSocketAddress("localhost", port));
+
+            try (var writer = new 
OutputStreamWriter(socket.getOutputStream())) {
+                writer.write("""
+                    GET /repro
+                    Accept: text/event-stream
+
+                    """);
+                writer.flush();
+            }
+            Thread.sleep(1_000);
+        }
+    }
+
+
+    private static class ReproServlet extends jakarta.servlet.http.HttpServlet 
{
+
+        private static final long serialVersionUID = 1L;
+        private final EventSource eventSource = new EventSource();
+
+        private final CountDownLatch completeLatch;
+
+        ReproServlet(CountDownLatch completeLatch) {
+            this.completeLatch = completeLatch;
+        }
+
+        @Override
+        protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
{
+            resp.setContentType("text/event-stream");
+            resp.setCharacterEncoding("UTF-8");
+            resp.setStatus(200);
+            AsyncContext context = req.startAsync();
+            context.addListener(new ReproAsyncListener());
+            eventSource.add(context);
+        }
+
+        private class ReproAsyncListener implements AsyncListener {
+            @Override
+            public void onComplete(AsyncEvent event) {
+                try {
+                    eventSource.remove(event.getAsyncContext());
+                } finally {
+                    completeLatch.countDown();
+                }
+            }
+
+            @Override
+            public void onTimeout(AsyncEvent event) {
+                // Not expected
+                new AssertionError("onTimeout").printStackTrace();
+            }
+
+            @Override
+            public void onError(AsyncEvent event) {
+                event.getAsyncContext().dispatch(); // Mirroring Spring's 
behaviour
+            }
+
+            @Override
+            public void onStartAsync(AsyncEvent event) {
+                // NO-OP
+            }
+        }
+    }
+
+
+    private static class EventSource {
+
+        private final Set<AsyncContext> contexts = new HashSet<>();
+
+        private EventSource() {
+            Thread.ofVirtual()
+                .name(getClass().getSimpleName())
+                .start(() -> {
+                    while (true) {
+                        try {
+                            Thread.sleep(2000);
+                            send("PING");
+                        } catch (InterruptedException e) {
+                            System.out.println("Failed to sleep: " + e);
+                        }
+                    }
+                });
+        }
+
+        public void send(String message) {
+            for (AsyncContext context : contexts) {
+                try {
+                    PrintWriter writer = context.getResponse().getWriter();
+                    writer.write("event: " + message + "\n\n");
+                    writer.flush();
+                } catch (Exception e) {
+                    System.out.println("Failed to send event: " + e);
+                }
+            }
+        }
+
+        public void add(AsyncContext context) {
+            contexts.add(context);
+        }
+
+        public void remove(AsyncContext context) {
+            contexts.remove(context);
+        }
+    }
+}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 04b77f6b9c..3eecf9f9b9 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -127,6 +127,11 @@
         list of support locks when provided in response to a PROPFIND request
         was incorrectly XML escaped. (markt)
       </fix>
+      <fix>
+        <bug>68227</bug>: Ensure that <code>AsyncListener.onComplete()</code> 
is
+        called if <code>AsyncListener.onError()</code> calls
+        <code>AsyncContext.dispatch()</code>. (markt)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Coyote">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to