Repository: aries-jax-rs-whiteboard
Updated Branches:
  refs/heads/master 7e1c84bed -> a24f688dd


Bring AsyncResponseImpl from CXF 3.2.5-SNAPSHOT

It has changed for SSE


Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/a24f688d
Tree: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/a24f688d
Diff: 
http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/a24f688d

Branch: refs/heads/master
Commit: a24f688dd49ede97bc7460b0be2312e742f0b7ab
Parents: 7e1c84b
Author: Carlos Sierra <[email protected]>
Authored: Fri Jun 1 15:25:22 2018 +0200
Committer: Carlos Sierra <[email protected]>
Committed: Fri Jun 1 15:25:22 2018 +0200

----------------------------------------------------------------------
 .../cxf/jaxrs/impl/AsyncResponseImpl.java       | 321 +++++++++++++++++++
 1 file changed, 321 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/a24f688d/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
----------------------------------------------------------------------
diff --git 
a/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
 
b/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
new file mode 100644
index 0000000..eb3a8d2
--- /dev/null
+++ 
b/jax-rs.whiteboard/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.ServiceUnavailableException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.CompletionCallback;
+import javax.ws.rs.container.ConnectionCallback;
+import javax.ws.rs.container.TimeoutHandler;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationCallback;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.jaxrs.utils.HttpUtils;
+import org.apache.cxf.message.Message;
+
+
+public class AsyncResponseImpl implements AsyncResponse, ContinuationCallback {
+
+    private Continuation cont;
+    private Message inMessage;
+    private TimeoutHandler timeoutHandler;
+    private volatile boolean initialSuspend;
+    private volatile boolean cancelled;
+    private volatile boolean done;
+    private volatile boolean resumedByApplication;
+    private volatile Long pendingTimeout;
+
+    private List<CompletionCallback> completionCallbacks = new 
LinkedList<CompletionCallback>();
+    private List<ConnectionCallback> connectionCallbacks = new 
LinkedList<ConnectionCallback>();
+    private Throwable unmappedThrowable;
+
+    public AsyncResponseImpl(Message inMessage) {
+        inMessage.put(AsyncResponse.class, this);
+        inMessage.getExchange().put(ContinuationCallback.class, this);
+        this.inMessage = inMessage;
+
+        initContinuation();
+    }
+
+    @Override
+    public boolean resume(Object response) {
+        return doResume(response);
+    }
+
+    @Override
+    public boolean resume(Throwable response) {
+        return doResume(response);
+    }
+
+    private boolean isCancelledOrNotSuspended() {
+        return isCancelled() || !isSuspended();
+    }
+
+    private boolean doResume(Object response) {
+        if (isCancelledOrNotSuspended()) {
+            return false;
+        }
+        return doResumeFinal(response);
+    }
+    private synchronized boolean doResumeFinal(Object response) {
+        inMessage.getExchange().put(AsyncResponse.class, this);
+        cont.setObject(response);
+        resumedByApplication = true;
+        if (!initialSuspend) {
+            cont.resume();
+        } else {
+            initialSuspend = false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean cancel() {
+        return doCancel(null);
+    }
+
+    @Override
+    public boolean cancel(int retryAfter) {
+        return doCancel(Integer.toString(retryAfter));
+    }
+
+    @Override
+    public boolean cancel(Date retryAfter) {
+        return doCancel(HttpUtils.getHttpDateFormat().format(retryAfter));
+    }
+
+    private boolean doCancel(String retryAfterHeader) {
+        if (cancelled) {
+            return true;
+        }
+        if (!isSuspended()) {
+            return false;
+        }
+
+        cancelled = true;
+        ResponseBuilder rb = Response.status(503);
+        if (retryAfterHeader != null) {
+            rb.header(HttpHeaders.RETRY_AFTER, retryAfterHeader);
+        }
+        doResumeFinal(rb.build());
+        return cancelled;
+    }
+
+    @Override
+    public boolean isSuspended() {
+        if (cancelled || resumedByApplication) {
+            return false;
+        }
+        return initialSuspend || cont.isPending();
+    }
+
+    @Override
+    public synchronized boolean isCancelled() {
+        return cancelled;
+    }
+
+    @Override
+    public boolean isDone() {
+        return done;
+    }
+
+    @Override
+    public synchronized boolean setTimeout(long time, TimeUnit unit) throws 
IllegalStateException {
+        if (isCancelledOrNotSuspended()) {
+            return false;
+        }
+        setAsyncResponseOnExchange();
+        long timeout = TimeUnit.MILLISECONDS.convert(time, unit);
+        initialSuspend = false;
+        if (!cont.isPending()) {
+            cont.suspend(timeout);
+        } else {
+            pendingTimeout = timeout;
+            cont.resume();
+        }
+        return true;
+    }
+
+    private void setAsyncResponseOnExchange() {
+        inMessage.getExchange().put(AsyncResponse.class, this);
+    }
+
+    @Override
+    public void setTimeoutHandler(TimeoutHandler handler) {
+        timeoutHandler = handler;
+    }
+
+    @Override
+    public Collection<Class<?>> register(Class<?> callback) throws 
NullPointerException {
+        return register(callback, new Class<?>[]{}).get(callback);
+    }
+
+    @Override
+    public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, 
Class<?>... callbacks)
+        throws NullPointerException {
+        try {
+            Object[] extraCallbacks = new Object[callbacks.length];
+            for (int i = 0; i < callbacks.length; i++) {
+                extraCallbacks[i] = callbacks[i].newInstance();
+            }
+            return register(callback.newInstance(), extraCallbacks);
+        } catch (NullPointerException e) {
+            throw e;
+        } catch (Throwable t) {
+            return Collections.emptyMap();
+        }
+
+    }
+
+    @Override
+    public Collection<Class<?>> register(Object callback) throws 
NullPointerException {
+        return register(callback, new Object[]{}).get(callback.getClass());
+    }
+
+    @Override
+    public Map<Class<?>, Collection<Class<?>>> register(Object callback, 
Object... callbacks)
+        throws NullPointerException {
+        Map<Class<?>, Collection<Class<?>>> map =
+            new HashMap<Class<?>, Collection<Class<?>>>();
+
+        Object[] allCallbacks = new Object[1 + callbacks.length];
+        allCallbacks[0] = callback;
+        System.arraycopy(callbacks, 0, allCallbacks, 1, callbacks.length);
+
+        for (int i = 0; i < allCallbacks.length; i++) {
+            if (allCallbacks[i] == null) {
+                throw new NullPointerException();
+            }
+            Class<?> callbackCls = allCallbacks[i].getClass();
+            Collection<Class<?>> knownCallbacks = map.get(callbackCls);
+            if (knownCallbacks == null) {
+                knownCallbacks = new HashSet<Class<?>>();
+                map.put(callbackCls, knownCallbacks);
+            }
+
+            if (allCallbacks[i] instanceof CompletionCallback) {
+                knownCallbacks.add(CompletionCallback.class);
+                completionCallbacks.add((CompletionCallback)allCallbacks[i]);
+            } else if (allCallbacks[i] instanceof ConnectionCallback) {
+                knownCallbacks.add(ConnectionCallback.class);
+                connectionCallbacks.add((ConnectionCallback)allCallbacks[i]);
+            }
+        }
+        return map;
+    }
+
+    @Override
+    public void onComplete() {
+        done = true;
+        updateCompletionCallbacks(unmappedThrowable);
+    }
+
+    @Override
+    public void onError(Throwable error) {
+        updateCompletionCallbacks(error);
+    }
+
+    private void updateCompletionCallbacks(Throwable error) {
+        Throwable actualError = error instanceof Fault ? 
((Fault)error).getCause() : error;
+        for (CompletionCallback completionCallback : completionCallbacks) {
+            completionCallback.onComplete(actualError);
+        }
+    }
+
+    @Override
+    public void onDisconnect() {
+        for (ConnectionCallback connectionCallback : connectionCallbacks) {
+            connectionCallback.onDisconnect(this);
+        }
+    }
+
+    public synchronized boolean suspendContinuationIfNeeded() {
+        if (!resumedByApplication && !isDone() && !cont.isPending() && 
!cont.isResumed()) {
+            cont.suspend(AsyncResponse.NO_TIMEOUT);
+            initialSuspend = false;
+            return true;
+        }
+        return false;
+    }
+
+    @SuppressWarnings("resource") // Response that is built here shouldn't be 
closed here
+    public Object getResponseObject() {
+        Object obj = cont.getObject();
+        if (!(obj instanceof Response) && !(obj instanceof Throwable)) {
+            if (obj == null) {
+                obj = Response.noContent().build();
+            } else {
+                obj = Response.ok().entity(obj).build();
+            }
+        }
+        return obj;
+    }
+
+    public boolean isResumedByApplication() {
+        return resumedByApplication;
+    }
+
+    public synchronized void handleTimeout() {
+        if (!resumedByApplication) {
+            if (pendingTimeout != null) {
+                setAsyncResponseOnExchange();
+                cont.suspend(pendingTimeout);
+                pendingTimeout = null;
+            } else if (timeoutHandler != null) {
+                timeoutHandler.handleTimeout(this);
+            } else {
+                cont.setObject(new ServiceUnavailableException());
+            }
+        }
+    }
+
+    private void initContinuation() {
+        ContinuationProvider provider =
+            
(ContinuationProvider)inMessage.get(ContinuationProvider.class.getName());
+        cont = provider.getContinuation();
+        initialSuspend = true;
+    }
+
+    public void prepareContinuation() {
+        initContinuation();
+    }
+
+    public void setUnmappedThrowable(Throwable t) {
+        unmappedThrowable = t;
+    }
+    public void reset() {
+        cont.reset();
+    }
+
+}

Reply via email to