This is an automated email from the ASF dual-hosted git repository. markt-asf pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit 2e57812b8099dc525308cef3f25fb449b9955284 Author: Mark Thomas <[email protected]> AuthorDate: Wed Jun 3 19:27:44 2026 +0100 Fix async handling --- .../apache/catalina/valves/PersistentValve.java | 125 ++++++++++++--------- 1 file changed, 75 insertions(+), 50 deletions(-) diff --git a/java/org/apache/catalina/valves/PersistentValve.java b/java/org/apache/catalina/valves/PersistentValve.java index b286ad8076..1bfca4d000 100644 --- a/java/org/apache/catalina/valves/PersistentValve.java +++ b/java/org/apache/catalina/valves/PersistentValve.java @@ -113,6 +113,8 @@ public class PersistentValve extends ValveBase { @Override public void invoke(Request request, Response response) throws IOException, ServletException { + containerLog.error("invoke"); + // request without session if (isRequestWithoutSession(request.getDecodedRequestURI())) { if (containerLog.isTraceEnabled()) { @@ -131,69 +133,81 @@ public class PersistentValve extends ValveBase { return; } + boolean asyncOnEntry = request.isAsync(); + + containerLog.error("invoke: asyncOnEntry [" + asyncOnEntry + "]"); + String sessionId = request.getRequestedSessionId(); UsageCountingSemaphore semaphore = null; boolean mustReleaseSemaphore = true; try { - // Acquire the per session semaphore - if (sessionId != null) { - semaphore = sessionToSemaphoreMap.compute(sessionId, - (k, v) -> v == null ? new UsageCountingSemaphore(semaphoreFairness) : v.incrementUsageCount()); - if (semaphoreBlockOnAcquire) { - if (semaphoreAcquireUninterruptibly) { - semaphore.acquireUninterruptibly(); + /* + * If the request was in asynchronous mode when it entered the Valve, the semaphore was acquired during the + * original request where asynchronous processing started and does not need to be acquired again. + */ + if (!asyncOnEntry) { + /* + * Acquire the per session semaphore. + */ + if (sessionId != null) { + semaphore = sessionToSemaphoreMap.compute(sessionId, + (k, v) -> v == null ? new UsageCountingSemaphore(semaphoreFairness) : v.incrementUsageCount()); + if (semaphoreBlockOnAcquire) { + if (semaphoreAcquireUninterruptibly) { + semaphore.acquireUninterruptibly(); + } else { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + mustReleaseSemaphore = false; + onSemaphoreNotAcquired(request, response); + if (containerLog.isDebugEnabled()) { + containerLog.debug(sm.getString("persistentValve.acquireInterrupted", + request.getDecodedRequestURI())); + } + return; + } + } } else { - try { - semaphore.acquire(); - } catch (InterruptedException e) { + if (!semaphore.tryAcquire()) { mustReleaseSemaphore = false; onSemaphoreNotAcquired(request, response); if (containerLog.isDebugEnabled()) { - containerLog.debug(sm.getString("persistentValve.acquireInterrupted", - request.getDecodedRequestURI())); + containerLog.debug( + sm.getString("persistentValve.acquireFailed", request.getDecodedRequestURI())); } return; } } - } else { - if (!semaphore.tryAcquire()) { - mustReleaseSemaphore = false; - onSemaphoreNotAcquired(request, response); - if (containerLog.isDebugEnabled()) { - containerLog.debug( - sm.getString("persistentValve.acquireFailed", request.getDecodedRequestURI())); - } - return; - } } - } - // Update the session last access time for our session (if any) - Manager manager = context.getManager(); - if (sessionId != null && manager instanceof StoreManager) { - Store store = ((StoreManager) manager).getStore(); - if (store != null) { - Session session = null; - try { - session = store.load(sessionId); - } catch (Exception e) { - containerLog.error(sm.getString("persistentValve.sessionLoadFail", sessionId)); - } - if (session != null) { - if (!session.isValid() || isSessionStale(session, System.currentTimeMillis())) { - if (containerLog.isTraceEnabled()) { - containerLog.trace("session swapped in is invalid or expired"); + // Update the session last access time for our session (if any) + Manager manager = context.getManager(); + if (sessionId != null && manager instanceof StoreManager) { + Store store = ((StoreManager) manager).getStore(); + if (store != null) { + Session session = null; + try { + session = store.load(sessionId); + } catch (Exception e) { + containerLog.error(sm.getString("persistentValve.sessionLoadFail", sessionId)); + } + if (session != null) { + if (!session.isValid() || isSessionStale(session, System.currentTimeMillis())) { + if (containerLog.isTraceEnabled()) { + containerLog.trace("session swapped in is invalid or expired"); + } + session.expire(); + store.remove(sessionId); + } else { + session.setManager(manager); + // session.setId(sessionId); Only if new ??? + manager.add(session); + // ((StandardSession)session).activate(); + session.access(); + session.endAccess(); } - session.expire(); - store.remove(sessionId); - } else { - session.setManager(manager); - // session.setId(sessionId); Only if new ??? - manager.add(session); - // ((StandardSession)session).activate(); - session.access(); - session.endAccess(); } } } @@ -205,15 +219,21 @@ public class PersistentValve extends ValveBase { // Ask the next valve to process the request. getNext().invoke(request, response); } finally { + containerLog.error("invoke - finally isAsync: [" + request.isAsync() + "]"); if (request.isAsync()) { /* * Need to continue to hold the semaphore until asynchronous processing is complete. Register a listener * that will release the Semaphore once asynchronous processing is complete. Also need to delay session * persistence until completion of the asynchronous processing. + * + * The listener must only be added once so it is only added when the request was not in asynchronous + * mode on entry. */ - AsyncContext asyncContext = request.getAsyncContext(); - asyncContext.addListener( - new StoreSessionAsyncListener(request, context, sessionId, semaphore, mustReleaseSemaphore)); + if (!asyncOnEntry) { + AsyncContext asyncContext = request.getAsyncContext(); + asyncContext.addListener( + new StoreSessionAsyncListener(request, context, sessionId, semaphore, mustReleaseSemaphore)); + } } else { storeSession(request, context, sessionId, semaphore, mustReleaseSemaphore); } @@ -510,21 +530,26 @@ public class PersistentValve extends ValveBase { @Override public void onComplete(AsyncEvent event) throws IOException { + containerLog.error("listener - on complete"); storeSession(request, context, originalSessionId, semaphore, mustReleaseSemaphore); } @Override public void onTimeout(AsyncEvent event) throws IOException { + containerLog.error("listener - on timeout"); // NO-OP. } @Override public void onError(AsyncEvent event) throws IOException { + containerLog.error("listener - on error"); // NO-OP. } @Override public void onStartAsync(AsyncEvent event) throws IOException { + containerLog.error("listener - on start async"); + event.getAsyncContext().addListener(this); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
