Repository: aries-jax-rs-whiteboard Updated Branches: refs/heads/master 7e1c84bed -> a24f688dd
Bring AsyncResponseImpl from CXF 3.2.5-SNAPSHOT It has changed for SSE Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/a24f688d Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/a24f688d Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/a24f688d Branch: refs/heads/master Commit: a24f688dd49ede97bc7460b0be2312e742f0b7ab Parents: 7e1c84b Author: Carlos Sierra <[email protected]> Authored: Fri Jun 1 15:25:22 2018 +0200 Committer: Carlos Sierra <[email protected]> Committed: Fri Jun 1 15:25:22 2018 +0200 ---------------------------------------------------------------------- .../cxf/jaxrs/impl/AsyncResponseImpl.java | 321 +++++++++++++++++++ 1 file changed, 321 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/a24f688d/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java new file mode 100644 index 0000000..eb3a8d2 --- /dev/null +++ b/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.impl; + +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.ServiceUnavailableException; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.CompletionCallback; +import javax.ws.rs.container.ConnectionCallback; +import javax.ws.rs.container.TimeoutHandler; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; + +import org.apache.cxf.continuations.Continuation; +import org.apache.cxf.continuations.ContinuationCallback; +import org.apache.cxf.continuations.ContinuationProvider; +import org.apache.cxf.interceptor.Fault; +import org.apache.cxf.jaxrs.utils.HttpUtils; +import org.apache.cxf.message.Message; + + +public class AsyncResponseImpl implements AsyncResponse, ContinuationCallback { + + private Continuation cont; + private Message inMessage; + private TimeoutHandler timeoutHandler; + private volatile boolean initialSuspend; + private volatile boolean cancelled; + private volatile boolean done; + private volatile boolean resumedByApplication; + private volatile Long pendingTimeout; + + private List<CompletionCallback> completionCallbacks = new LinkedList<CompletionCallback>(); + private List<ConnectionCallback> connectionCallbacks = new LinkedList<ConnectionCallback>(); + private Throwable unmappedThrowable; + + public AsyncResponseImpl(Message inMessage) { + inMessage.put(AsyncResponse.class, this); + inMessage.getExchange().put(ContinuationCallback.class, this); + this.inMessage = inMessage; + + initContinuation(); + } + + @Override + public boolean resume(Object response) { + return doResume(response); + } + + @Override + public boolean resume(Throwable response) { + return doResume(response); + } + + private boolean isCancelledOrNotSuspended() { + return isCancelled() || !isSuspended(); + } + + private boolean doResume(Object response) { + if (isCancelledOrNotSuspended()) { + return false; + } + return doResumeFinal(response); + } + private synchronized boolean doResumeFinal(Object response) { + inMessage.getExchange().put(AsyncResponse.class, this); + cont.setObject(response); + resumedByApplication = true; + if (!initialSuspend) { + cont.resume(); + } else { + initialSuspend = false; + } + return true; + } + + @Override + public boolean cancel() { + return doCancel(null); + } + + @Override + public boolean cancel(int retryAfter) { + return doCancel(Integer.toString(retryAfter)); + } + + @Override + public boolean cancel(Date retryAfter) { + return doCancel(HttpUtils.getHttpDateFormat().format(retryAfter)); + } + + private boolean doCancel(String retryAfterHeader) { + if (cancelled) { + return true; + } + if (!isSuspended()) { + return false; + } + + cancelled = true; + ResponseBuilder rb = Response.status(503); + if (retryAfterHeader != null) { + rb.header(HttpHeaders.RETRY_AFTER, retryAfterHeader); + } + doResumeFinal(rb.build()); + return cancelled; + } + + @Override + public boolean isSuspended() { + if (cancelled || resumedByApplication) { + return false; + } + return initialSuspend || cont.isPending(); + } + + @Override + public synchronized boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return done; + } + + @Override + public synchronized boolean setTimeout(long time, TimeUnit unit) throws IllegalStateException { + if (isCancelledOrNotSuspended()) { + return false; + } + setAsyncResponseOnExchange(); + long timeout = TimeUnit.MILLISECONDS.convert(time, unit); + initialSuspend = false; + if (!cont.isPending()) { + cont.suspend(timeout); + } else { + pendingTimeout = timeout; + cont.resume(); + } + return true; + } + + private void setAsyncResponseOnExchange() { + inMessage.getExchange().put(AsyncResponse.class, this); + } + + @Override + public void setTimeoutHandler(TimeoutHandler handler) { + timeoutHandler = handler; + } + + @Override + public Collection<Class<?>> register(Class<?> callback) throws NullPointerException { + return register(callback, new Class<?>[]{}).get(callback); + } + + @Override + public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, Class<?>... callbacks) + throws NullPointerException { + try { + Object[] extraCallbacks = new Object[callbacks.length]; + for (int i = 0; i < callbacks.length; i++) { + extraCallbacks[i] = callbacks[i].newInstance(); + } + return register(callback.newInstance(), extraCallbacks); + } catch (NullPointerException e) { + throw e; + } catch (Throwable t) { + return Collections.emptyMap(); + } + + } + + @Override + public Collection<Class<?>> register(Object callback) throws NullPointerException { + return register(callback, new Object[]{}).get(callback.getClass()); + } + + @Override + public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... callbacks) + throws NullPointerException { + Map<Class<?>, Collection<Class<?>>> map = + new HashMap<Class<?>, Collection<Class<?>>>(); + + Object[] allCallbacks = new Object[1 + callbacks.length]; + allCallbacks[0] = callback; + System.arraycopy(callbacks, 0, allCallbacks, 1, callbacks.length); + + for (int i = 0; i < allCallbacks.length; i++) { + if (allCallbacks[i] == null) { + throw new NullPointerException(); + } + Class<?> callbackCls = allCallbacks[i].getClass(); + Collection<Class<?>> knownCallbacks = map.get(callbackCls); + if (knownCallbacks == null) { + knownCallbacks = new HashSet<Class<?>>(); + map.put(callbackCls, knownCallbacks); + } + + if (allCallbacks[i] instanceof CompletionCallback) { + knownCallbacks.add(CompletionCallback.class); + completionCallbacks.add((CompletionCallback)allCallbacks[i]); + } else if (allCallbacks[i] instanceof ConnectionCallback) { + knownCallbacks.add(ConnectionCallback.class); + connectionCallbacks.add((ConnectionCallback)allCallbacks[i]); + } + } + return map; + } + + @Override + public void onComplete() { + done = true; + updateCompletionCallbacks(unmappedThrowable); + } + + @Override + public void onError(Throwable error) { + updateCompletionCallbacks(error); + } + + private void updateCompletionCallbacks(Throwable error) { + Throwable actualError = error instanceof Fault ? ((Fault)error).getCause() : error; + for (CompletionCallback completionCallback : completionCallbacks) { + completionCallback.onComplete(actualError); + } + } + + @Override + public void onDisconnect() { + for (ConnectionCallback connectionCallback : connectionCallbacks) { + connectionCallback.onDisconnect(this); + } + } + + public synchronized boolean suspendContinuationIfNeeded() { + if (!resumedByApplication && !isDone() && !cont.isPending() && !cont.isResumed()) { + cont.suspend(AsyncResponse.NO_TIMEOUT); + initialSuspend = false; + return true; + } + return false; + } + + @SuppressWarnings("resource") // Response that is built here shouldn't be closed here + public Object getResponseObject() { + Object obj = cont.getObject(); + if (!(obj instanceof Response) && !(obj instanceof Throwable)) { + if (obj == null) { + obj = Response.noContent().build(); + } else { + obj = Response.ok().entity(obj).build(); + } + } + return obj; + } + + public boolean isResumedByApplication() { + return resumedByApplication; + } + + public synchronized void handleTimeout() { + if (!resumedByApplication) { + if (pendingTimeout != null) { + setAsyncResponseOnExchange(); + cont.suspend(pendingTimeout); + pendingTimeout = null; + } else if (timeoutHandler != null) { + timeoutHandler.handleTimeout(this); + } else { + cont.setObject(new ServiceUnavailableException()); + } + } + } + + private void initContinuation() { + ContinuationProvider provider = + (ContinuationProvider)inMessage.get(ContinuationProvider.class.getName()); + cont = provider.getContinuation(); + initialSuspend = true; + } + + public void prepareContinuation() { + initContinuation(); + } + + public void setUnmappedThrowable(Throwable t) { + unmappedThrowable = t; + } + public void reset() { + cont.reset(); + } + +}
