Repository: aries-jax-rs-whiteboard Updated Branches: refs/heads/master 1d41c82d6 -> 7e1c84bed
Bring SSE changes from CXF 3.2.5-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/7183bfd9 Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/7183bfd9 Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/7183bfd9 Branch: refs/heads/master Commit: 7183bfd91f89ac99c84e711c309218cdda0c5638 Parents: 1d41c82 Author: Carlos Sierra <[email protected]> Authored: Fri Jun 1 10:16:55 2018 +0200 Committer: Carlos Sierra <[email protected]> Committed: Fri Jun 1 14:36:39 2018 +0200 ---------------------------------------------------------------------- .../cxf/sse/OutboundSseEventBodyWriter.java | 31 +-- .../internal/cxf/sse/OutboundSseEventImpl.java | 31 +-- .../internal/cxf/sse/SseBroadcasterImpl.java | 80 ++++---- .../internal/cxf/sse/SseContextProvider.java | 31 +-- .../cxf/sse/SseEventSinkContextProvider.java | 86 ++------ .../internal/cxf/sse/SseEventSinkImpl.java | 197 ++++++++++--------- .../rs/whiteboard/internal/cxf/sse/SseImpl.java | 31 +-- 7 files changed, 227 insertions(+), 260 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java index 954b8c5..32869e6 100644 --- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java +++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventBodyWriter.java @@ -1,20 +1,21 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse; import java.io.IOException; @@ -137,4 +138,4 @@ public class OutboundSseEventBodyWriter implements MessageBodyWriter<OutboundSse MediaType mediaType) { return -1; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java index 2b495e8..85e9e5b 100644 --- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java +++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/OutboundSseEventImpl.java @@ -1,20 +1,21 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse; import java.lang.reflect.Type; @@ -178,4 +179,4 @@ public final class OutboundSseEventImpl implements OutboundSseEvent { public Object getData() { return data; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java index 4ead14d..eee4d70 100644 --- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java +++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseBroadcasterImpl.java @@ -1,20 +1,21 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse; import java.io.IOException; @@ -24,6 +25,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -36,22 +38,18 @@ import javax.ws.rs.sse.SseEventSink; public class SseBroadcasterImpl implements SseBroadcaster { private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>(); - - private final Set<Consumer<SseEventSink>> closers = - new CopyOnWriteArraySet<>(); - - private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = - new CopyOnWriteArraySet<>(); + private final Set<Consumer<SseEventSink>> closers = new CopyOnWriteArraySet<>(); + private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners = new CopyOnWriteArraySet<>(); + private final AtomicBoolean closed = new AtomicBoolean(false); @Override public void register(SseEventSink sink) { - if (closed) throw new IllegalStateException("Already closed"); + assertNotClosed(); - SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink; + final SseEventSinkImpl sinkImpl = (SseEventSinkImpl)sink; + final AsyncContext ctx = sinkImpl.getAsyncContext(); - AsyncContext asyncContext = sinkImpl.getAsyncContext(); - - asyncContext.addListener(new AsyncListener() { + ctx.addListener(new AsyncListener() { @Override public void onComplete(AsyncEvent asyncEvent) throws IOException { subscribers.remove(sink); @@ -78,16 +76,14 @@ public class SseBroadcasterImpl implements SseBroadcaster { @Override public CompletionStage<?> broadcast(OutboundSseEvent event) { - if (closed) throw new IllegalStateException("Already closed"); + assertNotClosed(); final Collection<CompletableFuture<?>> futures = new ArrayList<>(); - for (SseEventSink sink: subscribers) { try { futures.add(sink.send(event).toCompletableFuture()); } catch (final Exception ex) { - exceptioners.forEach( - exceptioner -> exceptioner.accept(sink, ex)); + exceptioners.forEach(exceptioner -> exceptioner.accept(sink, ex)); } } @@ -96,27 +92,29 @@ public class SseBroadcasterImpl implements SseBroadcaster { @Override public void onClose(Consumer<SseEventSink> subscriber) { - if (closed) throw new IllegalStateException("Already closed"); - + assertNotClosed(); closers.add(subscriber); } @Override public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) { - if (closed) throw new IllegalStateException("Already closed"); - + assertNotClosed(); exceptioners.add(exceptioner); } @Override public void close() { - closed = true; - - subscribers.forEach(subscriber -> { - subscriber.close(); - closers.forEach(closer -> closer.accept(subscriber)); - }); + if (closed.compareAndSet(false, true)) { + subscribers.forEach(subscriber -> { + subscriber.close(); + closers.forEach(closer -> closer.accept(subscriber)); + }); + } } - private volatile boolean closed; -} \ No newline at end of file + private void assertNotClosed() { + if (closed.get()) { + throw new IllegalStateException("The SSE broadcaster is already closed"); + } + } +} http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java index 054765f..6c4b695 100644 --- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java +++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseContextProvider.java @@ -1,20 +1,21 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse; import javax.ws.rs.sse.Sse; @@ -27,4 +28,4 @@ public class SseContextProvider implements ContextProvider<Sse> { public Sse createContext(Message message) { return new SseImpl(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java index 0e8ad9f..3240fe5 100644 --- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java +++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkContextProvider.java @@ -1,41 +1,35 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse; -import javax.servlet.AsyncContext; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.ext.MessageBodyWriter; import javax.ws.rs.sse.OutboundSseEvent; import javax.ws.rs.sse.SseEventSink; -import org.apache.cxf.interceptor.Fault; -import org.apache.cxf.interceptor.Interceptor; import org.apache.cxf.jaxrs.ext.ContextProvider; +import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; import org.apache.cxf.jaxrs.provider.ServerProviderFactory; import org.apache.cxf.message.Message; -import org.apache.cxf.phase.Phase; -import org.apache.cxf.phase.PhaseInterceptor; import org.apache.cxf.transport.http.AbstractHTTPDestination; -import java.util.Collection; -import java.util.Collections; -import java.util.Set; - public class SseEventSinkContextProvider implements ContextProvider<SseEventSink> { @Override @@ -48,51 +42,7 @@ public class SseEventSinkContextProvider implements ContextProvider<SseEventSink final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter( ServerProviderFactory.getInstance(message), message.getExchange()); - AsyncContext ctx = request.startAsync(); - ctx.setTimeout(0); - - message.getInterceptorChain().add(new SuspendPhaseInterceptor()); - - return new SseEventSinkImpl(writer, ctx); - } - - private static class SuspendPhaseInterceptor - implements PhaseInterceptor<Message> { - - @Override - public Set<String> getAfter() { - return Collections.emptySet(); - } - - @Override - public Set<String> getBefore() { - return Collections.singleton( - "org.apache.cxf.interceptor.OutgoingChainInterceptor"); - } - - @Override - public String getId() { - return "SSE SUSPEND"; - } - - @Override - public String getPhase() { - return Phase.POST_INVOKE; - } - - @Override - public Collection<PhaseInterceptor<? extends Message>> getAdditionalInterceptors() { - return Collections.emptySet(); - } - - @Override - public void handleMessage(Message message) throws Fault { - message.getInterceptorChain().suspend(); - } - - @Override - public void handleFault(Message message) { - } - + final AsyncResponseImpl async = new AsyncResponseImpl(message); + return new SseEventSinkImpl(writer, async, request.getAsyncContext()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java index b29ad20..cdcacb1 100644 --- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java +++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseEventSinkImpl.java @@ -1,31 +1,36 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse; import java.lang.annotation.Annotation; -import java.util.LinkedList; import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.logging.Logger; import javax.servlet.AsyncContext; -import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.ext.MessageBodyWriter; import javax.ws.rs.sse.OutboundSseEvent; import javax.ws.rs.sse.SseEventSink; @@ -33,114 +38,124 @@ import javax.ws.rs.sse.SseEventSink; import org.apache.cxf.common.logging.LogUtils; public class SseEventSinkImpl implements SseEventSink { + private static final Annotation[] EMPTY_ANNOTATIONS = new Annotation [] {}; private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class); - - public AsyncContext getAsyncContext() { - return ctx; - } + private static final int BUFFER_SIZE = 10000; // buffering 10000 messages private final AsyncContext ctx; - - private static class QueuedEvent { - final OutboundSseEvent event; - final CompletableFuture<?> completion; - - public QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) { - this.event = event; - this.completion = completion; - } - } - private final MessageBodyWriter<OutboundSseEvent> writer; - private final Queue<QueuedEvent> queuedEvents; - private boolean dequeueing; - - private volatile boolean closed; + private final Queue<QueuedEvent> buffer; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicBoolean dispatching = new AtomicBoolean(false); - public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer, - final AsyncContext ctx) { + public SseEventSinkImpl(final MessageBodyWriter<OutboundSseEvent> writer, + final AsyncResponse async, final AsyncContext ctx) { + this.writer = writer; - this.queuedEvents = new LinkedList<>(); + this.buffer = new ArrayBlockingQueue<>(BUFFER_SIZE); this.ctx = ctx; if (ctx == null) { throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. " - + "Is the Servlet configured properly?"); + + "Is the Servlet configured properly?"); } - + ctx.getResponse().setContentType(OutboundSseEventBodyWriter.SERVER_SENT_EVENTS); } + public AsyncContext getAsyncContext() { + return ctx; + } + @Override public void close() { - if (!closed) { - closed = true; - + if (closed.compareAndSet(false, true)) { + // In case we are still dispatching, give the events the chance to be + // sent over to the consumers. The good example would be sent(event) call, + // immediately followed by the close() call. + if (!awaitQueueToDrain(5, TimeUnit.SECONDS)) { + LOG.warning("There are still SSE events the queue which may not be delivered (closing now)"); + } + try { ctx.complete(); - } catch (final Exception ex) { - LOG.warning("Failed to close the AsyncContext cleanly: " - + ex.getMessage()); + } catch (final IllegalStateException ex) { + LOG.warning("Failed to close the AsyncContext cleanly: " + ex.getMessage()); } } } + private boolean awaitQueueToDrain(int timeout, TimeUnit unit) { + final long parkTime = unit.toNanos(timeout) / 20; + int attempt = 0; + + while (dispatching.get() && ++attempt < 20) { + LockSupport.parkNanos(parkTime); + } + + return buffer.isEmpty(); + } + + @Override + public boolean isClosed() { + return closed.get(); + } + @Override public CompletionStage<?> send(OutboundSseEvent event) { final CompletableFuture<?> future = new CompletableFuture<>(); - - if (!closed && writer != null) { - - boolean startDequeue; - synchronized (this) { - queuedEvents.offer(new QueuedEvent(event, future)); - if(dequeueing) { - startDequeue = false; - } else { - startDequeue = true; - dequeueing = true; + + if (!closed.get() && writer != null) { + if (buffer.offer(new QueuedEvent(event, future))) { + if (dispatching.compareAndSet(false, true)) { + ctx.start(this::dequeue); } + } else { + future.completeExceptionally(new IllegalStateException( + "The buffer is full (10000), unable to queue SSE event for send")); } - - if(startDequeue) { - ctx.start(this::dequeue); - } - } - else { - future.complete(null); + } else { + future.completeExceptionally(new IllegalStateException( + "The sink is already closed, unable to queue SSE event for send")); } return future; } - + private void dequeue() { - - for(;;) { - QueuedEvent qe; - synchronized (this) { - qe = queuedEvents.poll(); - if(qe == null) { - dequeueing = false; - break; - } - } - OutboundSseEvent event = qe.event; - CompletableFuture<?> future = qe.completion; - - try { - writer.writeTo(event, event.getClass(), event.getGenericType(), new Annotation [] {}, event.getMediaType(), null, ctx.getResponse().getOutputStream()); - ctx.getResponse().flushBuffer(); - future.complete(null); - - } catch (final Exception ex) { - future.completeExceptionally(ex); - } - - } + try { + while (true) { + final QueuedEvent qeuedEvent = buffer.poll(); + + // Nothing queued, release the thread + if (qeuedEvent == null) { + break; + } + + final OutboundSseEvent event = qeuedEvent.event; + final CompletableFuture<?> future = qeuedEvent.completion; + + try { + writer.writeTo(event, event.getClass(), event.getGenericType(), EMPTY_ANNOTATIONS, + event.getMediaType(), null, ctx.getResponse().getOutputStream()); + ctx.getResponse().flushBuffer(); + future.complete(null); + } catch (final Exception ex) { + future.completeExceptionally(ex); + } + } + } finally { + dispatching.set(false); + } } - @Override - public boolean isClosed() { - return closed; + private static class QueuedEvent { + private final OutboundSseEvent event; + private final CompletableFuture<?> completion; + + QueuedEvent(OutboundSseEvent event, CompletableFuture<?> completion) { + this.event = event; + this.completion = completion; + } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/7183bfd9/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java ---------------------------------------------------------------------- diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java index b9ce54f..b558120 100644 --- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java +++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/cxf/sse/SseImpl.java @@ -1,20 +1,21 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.aries.jax.rs.whiteboard.internal.cxf.sse; import javax.ws.rs.sse.OutboundSseEvent.Builder; @@ -34,4 +35,4 @@ class SseImpl implements Sse { public SseBroadcaster newBroadcaster() { return new SseBroadcasterImpl(); } -} \ No newline at end of file +}
